cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

CDC with Delta Live Tables, with AutoLoader, isn't applying 'deletes'

BradSheridan
Valued Contributor

Hey there Community!! I'm using dlt.apply_changes in my DLT job as follows:

dlt.apply_changes(

 target = "employee_silver", 

 source = "employee_bronze_clean_v", 

 keys = ["EMPLOYEE_ID"], 

 sequence_by = col("last_updated"), 

 apply_as_deletes = expr("Op = 'D'"), 

 except_column_list = ["Op", "_rescued_data"])

The apply_as_deletes line of code isn't working. I'm using AWS Database Migration Service and in "ongoing replication mode", it creates csv files in S3 with a column Op and the values are one of I / U / D (Insert, Update, Delete respectively). I can't figure out why my delete transaction isn't being processed by this pipeline.

Any insight is GREATLY appreciated! thanks!

4 REPLIES 4

axb0
New Contributor III
New Contributor III

First try expr("Operation = 'DELETE'") for your apply_as_deletes

BradSheridan
Valued Contributor

ok....will give it a try momentarily and report back here

BradSheridan
Valued Contributor

@Alex Barreto​ i gave that a shot and it errored out saying no column DELETE exists. Here is the CDC file that I'm loading (fake data of course):

Op,EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID,last_updated

D,206,William,Gietz,WGIETZ,515.123.8181,2002-06-07,AC_ACCOUNT,8300,,205,110,2021-10-06 21:26:04

So I have to use expr("Op = 'D'") and that's what isn't working. When I use Databricks SQL to query my Bronze table, I see 2 rows for this primary key (206)...one with an 'I' (for this initial insert) and one with a 'D' (for the deleted transaction from the source (MySQL Aurora). But it's that D row that isn't flowing into my Silver table

BradSheridan
Valued Contributor

@Alex Barreto​ Think I got it now!! I was originally using  sequence_by = col("createDate") in my apply_changes code, which comes from the source table. However, I realized that I have an intermediate dlt.view function to do some rudimentary data quality checks (dlt.expect) and in that function, I also do this:

def employee_bronze_clean_v():

 return dlt.read_stream("employee_bronze") \

 .withColumn('inputFileName',F.input_file_name()) \

 .withColumn('LoadDate',F.lit(datetime.now()))

I then used LoadDate in my sequence_by = col("LoadDate") in the dlt.apply_changes and presto....the row is gone from my Silver table!!!

But is it gone? is it still available in time travel? does it still exist in the parquet file? I worry about this for the privacy laws like GDPR, CCPA, etc...

p.s. thanks again for helping out!