I have an existing DLT pipeline that works on a modified medallion architecture. Data is sent from debezium to kafka and lands into a bronze table. From bronze table, it goes to a silver table where it is schematized. Finally to a good table where I use apply_changes to handle upserts and deletes. I added apply_as_deletes to an existing pipeline and table and i am seeing a weird issue. The deletes works on new incoming new data but when i do a full refresh of silver and gold table, it does not delete existing data from the table. Instead what i am seeing is that rows exist which have both __UPSERTVERSION and __DELETEVERSION.
The rows exists in silver table like this
id, metadata, operation_ts, op
1, m1, ts1, c
1, m1, ts2, d
my apply_changes looks like this
dlt.create_streaming_table(name=dest_scd_table)
dlt.apply_changes(
target = dest_scd_table, #The table being materilized
source = source_table, #the incoming CDC
keys = keys, #what we'll be using to match the rows to upsert
sequence_by = col(seq_col), #we deduplicate by operation date getting the most recent value
ignore_null_updates = False,
stored_as_scd_type = scd_type,
apply_as_deletes = expr("op = 'd'"), # If row is deleted in postgres, we will delete the row in the _final table. It will still exist in append-only tables.
except_column_list = [col("op")] # List of columns from the silver tables we do not want to appear in the gold tables.
)
This query gives me the correct count if i filter by it
select count(*) from <table> where __DROP_EXPECTATIONS_COL is not null