CDC with Delta Live Tables, with AutoLoader, isn't applying 'deletes'
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-17-2022 06:53 AM
Hey there Community!! I'm using dlt.apply_changes in my DLT job as follows:
dlt.apply_changes(
target = "employee_silver",
source = "employee_bronze_clean_v",
keys = ["EMPLOYEE_ID"],
sequence_by = col("last_updated"),
apply_as_deletes = expr("Op = 'D'"),
except_column_list = ["Op", "_rescued_data"])
The apply_as_deletes line of code isn't working. I'm using AWS Database Migration Service and in "ongoing replication mode", it creates csv files in S3 with a column Op and the values are one of I / U / D (Insert, Update, Delete respectively). I can't figure out why my delete transaction isn't being processed by this pipeline.
Any insight is GREATLY appreciated! thanks!
- Labels:
-
Delta Live Tables
-
DLT
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-17-2022 08:09 AM
First try expr("Operation = 'DELETE'") for your apply_as_deletes
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-17-2022 08:10 AM
ok....will give it a try momentarily and report back here
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-17-2022 10:08 AM
@Alex Barreto i gave that a shot and it errored out saying no column DELETE exists. Here is the CDC file that I'm loading (fake data of course):
Op,EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID,last_updated
D,206,William,Gietz,WGIETZ,515.123.8181,2002-06-07,AC_ACCOUNT,8300,,205,110,2021-10-06 21:26:04
So I have to use expr("Op = 'D'") and that's what isn't working. When I use Databricks SQL to query my Bronze table, I see 2 rows for this primary key (206)...one with an 'I' (for this initial insert) and one with a 'D' (for the deleted transaction from the source (MySQL Aurora). But it's that D row that isn't flowing into my Silver table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-17-2022 10:17 AM
@Alex Barreto Think I got it now!! I was originally using sequence_by = col("createDate") in my apply_changes code, which comes from the source table. However, I realized that I have an intermediate dlt.view function to do some rudimentary data quality checks (dlt.expect) and in that function, I also do this:
def employee_bronze_clean_v():
return dlt.read_stream("employee_bronze") \
.withColumn('inputFileName',F.input_file_name()) \
.withColumn('LoadDate',F.lit(datetime.now()))
I then used LoadDate in my sequence_by = col("LoadDate") in the dlt.apply_changes and presto....the row is gone from my Silver table!!!
But is it gone? is it still available in time travel? does it still exist in the parquet file? I worry about this for the privacy laws like GDPR, CCPA, etc...
p.s. thanks again for helping out!

