cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

CDC with Delta Live Tables, with AutoLoader, isn't applying 'deletes'

BradSheridan
Valued Contributor

Hey there Community!! I'm using dlt.apply_changes in my DLT job as follows:

dlt.apply_changes(

 target = "employee_silver", 

 source = "employee_bronze_clean_v", 

 keys = ["EMPLOYEE_ID"], 

 sequence_by = col("last_updated"), 

 apply_as_deletes = expr("Op = 'D'"), 

 except_column_list = ["Op", "_rescued_data"])

The apply_as_deletes line of code isn't working. I'm using AWS Database Migration Service and in "ongoing replication mode", it creates csv files in S3 with a column Op and the values are one of I / U / D (Insert, Update, Delete respectively). I can't figure out why my delete transaction isn't being processed by this pipeline.

Any insight is GREATLY appreciated! thanks!

4 REPLIES 4

axb0
New Contributor III

First try expr("Operation = 'DELETE'") for your apply_as_deletes

BradSheridan
Valued Contributor

ok....will give it a try momentarily and report back here

BradSheridan
Valued Contributor

@Alex Barreto​ i gave that a shot and it errored out saying no column DELETE exists. Here is the CDC file that I'm loading (fake data of course):

Op,EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID,last_updated

D,206,William,Gietz,WGIETZ,515.123.8181,2002-06-07,AC_ACCOUNT,8300,,205,110,2021-10-06 21:26:04

So I have to use expr("Op = 'D'") and that's what isn't working. When I use Databricks SQL to query my Bronze table, I see 2 rows for this primary key (206)...one with an 'I' (for this initial insert) and one with a 'D' (for the deleted transaction from the source (MySQL Aurora). But it's that D row that isn't flowing into my Silver table

BradSheridan
Valued Contributor

@Alex Barreto​ Think I got it now!! I was originally using  sequence_by = col("createDate") in my apply_changes code, which comes from the source table. However, I realized that I have an intermediate dlt.view function to do some rudimentary data quality checks (dlt.expect) and in that function, I also do this:

def employee_bronze_clean_v():

 return dlt.read_stream("employee_bronze") \

 .withColumn('inputFileName',F.input_file_name()) \

 .withColumn('LoadDate',F.lit(datetime.now()))

I then used LoadDate in my sequence_by = col("LoadDate") in the dlt.apply_changes and presto....the row is gone from my Silver table!!!

But is it gone? is it still available in time travel? does it still exist in the parquet file? I worry about this for the privacy laws like GDPR, CCPA, etc...

p.s. thanks again for helping out!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group