DLT apply_as_deletes not working on existing data with full refresh
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
I have an existing DLT pipeline that works on a modified medallion architecture. Data is sent from debezium to kafka and lands into a bronze table. From bronze table, it goes to a silver table where it is schematized. Finally to a good table where I use apply_changes to handle upserts and deletes. I added apply_as_deletes to an existing pipeline and table and i am seeing a weird issue. The deletes works on new incoming new data but when i do a full refresh of silver and gold table, it does not delete existing data from the table. Instead what i am seeing is that rows exist which have both __UPSERTVERSION and __DELETEVERSION.
The rows exists in silver table like this
id, metadata, operation_ts, op
1, m1, ts1, c
1, m1, ts2, d
my apply_changes looks like this
dlt.create_streaming_table(name=dest_scd_table)
dlt.apply_changes(
target = dest_scd_table, #The table being materilized
source = source_table, #the incoming CDC
keys = keys, #what we'll be using to match the rows to upsert
sequence_by = col(seq_col), #we deduplicate by operation date getting the most recent value
ignore_null_updates = False,
stored_as_scd_type = scd_type,
apply_as_deletes = expr("op = 'd'"), # If row is deleted in postgres, we will delete the row in the _final table. It will still exist in append-only tables.
except_column_list = [col("op")] # List of columns from the silver tables we do not want to appear in the gold tables.
)
This query gives me the correct count if i filter by it
select count(*) from <table> where __DROP_EXPECTATIONS_COL is not null
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
for context, i started a new pipeline and the gold table in DLT UI says 38k upserted records, 0 deleted records. The gold table has ~200k records(includes records that should've been deleted).
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Hi @sdes10 ,
If full refresh is not working as expected, consider using incremental refresh for materialized views. Incremental refresh attempts to process only new or changed data, which might help in correctly applying deletes.
If you are using streaming tables, you can set the skipChangeCommits
option to ignore file-changing operations.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
@Sidhant07 how do i use skipChangeCommits? The idea is that i have a bronze, silver and gold table already built. Now i am enabling deletes on gold table in the apply_changes API. The silver table is added with operation column (values c,u,r,d). I did a full refresh of silver table to propagate the values in this operation column and then want the gold table to delete those rows with operation='d'. So i did a full refresh of silver and gold. Silver table gets operation value populated but in gold table, the rows with operation='delete' persist. The value for columns __UpsertVersion and __DeleteVersion are the same, which i believe is what is causing them to not get deleted. I dont know how these columns get populated.

