07-17-2022 06:53 AM
Hey there Community!! I'm using dlt.apply_changes in my DLT job as follows:
dlt.apply_changes(
target = "employee_silver",
source = "employee_bronze_clean_v",
keys = ["EMPLOYEE_ID"],
sequence_by = col("last_updated"),
apply_as_deletes = expr("Op = 'D'"),
except_column_list = ["Op", "_rescued_data"])
The apply_as_deletes line of code isn't working. I'm using AWS Database Migration Service and in "ongoing replication mode", it creates csv files in S3 with a column Op and the values are one of I / U / D (Insert, Update, Delete respectively). I can't figure out why my delete transaction isn't being processed by this pipeline.
Any insight is GREATLY appreciated! thanks!
07-17-2022 08:09 AM
First try expr("Operation = 'DELETE'") for your apply_as_deletes
07-17-2022 08:10 AM
ok....will give it a try momentarily and report back here
07-17-2022 10:08 AM
@Alex Barreto i gave that a shot and it errored out saying no column DELETE exists. Here is the CDC file that I'm loading (fake data of course):
Op,EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID,last_updated
D,206,William,Gietz,WGIETZ,515.123.8181,2002-06-07,AC_ACCOUNT,8300,,205,110,2021-10-06 21:26:04
So I have to use expr("Op = 'D'") and that's what isn't working. When I use Databricks SQL to query my Bronze table, I see 2 rows for this primary key (206)...one with an 'I' (for this initial insert) and one with a 'D' (for the deleted transaction from the source (MySQL Aurora). But it's that D row that isn't flowing into my Silver table
07-17-2022 10:17 AM
@Alex Barreto Think I got it now!! I was originally using sequence_by = col("createDate") in my apply_changes code, which comes from the source table. However, I realized that I have an intermediate dlt.view function to do some rudimentary data quality checks (dlt.expect) and in that function, I also do this:
def employee_bronze_clean_v():
return dlt.read_stream("employee_bronze") \
.withColumn('inputFileName',F.input_file_name()) \
.withColumn('LoadDate',F.lit(datetime.now()))
I then used LoadDate in my sequence_by = col("LoadDate") in the dlt.apply_changes and presto....the row is gone from my Silver table!!!
But is it gone? is it still available in time travel? does it still exist in the parquet file? I worry about this for the privacy laws like GDPR, CCPA, etc...
p.s. thanks again for helping out!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group