cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT apply_as_deletes not working on existing data with full refresh

sdes10
New Contributor II

I have an existing DLT pipeline that works on a modified medallion architecture. Data is sent from debezium to kafka and lands into a bronze table. From bronze table, it goes to a silver table where it is schematized. Finally to a good table where I use apply_changes to handle upserts and deletes. I added apply_as_deletes to an existing pipeline and table and i am seeing a weird issue. The deletes works on new incoming new data but when i do a full refresh of silver and gold table, it does not delete existing data from the table. Instead what i am seeing is that rows exist which have both __UPSERTVERSION and __DELETEVERSION. 

The rows exists in silver table like this

id, metadata, operation_ts, op

1, m1, ts1, c

1, m1, ts2, d

my apply_changes looks like this

dlt.create_streaming_table(name=dest_scd_table)
dlt.apply_changes(
target = dest_scd_table, #The table being materilized
source = source_table, #the incoming CDC
keys = keys, #what we'll be using to match the rows to upsert
sequence_by = col(seq_col), #we deduplicate by operation date getting the most recent value
ignore_null_updates = False,
stored_as_scd_type = scd_type,
apply_as_deletes = expr("op = 'd'"), # If row is deleted in postgres, we will delete the row in the _final table. It will still exist in append-only tables.
except_column_list = [col("op")] # List of columns from the silver tables we do not want to appear in the gold tables.
)

This query gives me the correct count if i filter by it

select count(*) from <table> where __DROP_EXPECTATIONS_COL is not null

3 REPLIES 3

sdes10
New Contributor II

for context, i started a new pipeline and the gold table in DLT UI says 38k upserted records, 0 deleted records. The gold table has ~200k records(includes records that should've been deleted). 

 

Sidhant07
Databricks Employee
Databricks Employee

Hi @sdes10 ,

If full refresh is not working as expected, consider using incremental refresh for materialized views. Incremental refresh attempts to process only new or changed data, which might help in correctly applying deletes.

If you are using streaming tables, you can set the skipChangeCommits option to ignore file-changing operations.

sdes10
New Contributor II

@Sidhant07 how do i use skipChangeCommits? The idea is that i have a bronze, silver and gold table already built. Now i am enabling deletes on gold table in the apply_changes API. The silver table is added with operation column (values c,u,r,d). I did a full refresh of silver table to propagate the values in this operation column and then want the gold table to delete those rows with operation='d'. So i did a full refresh of silver and gold. Silver table gets operation value populated but in gold table, the rows with operation='delete' persist. The value for columns __UpsertVersion and __DeleteVersion are the same, which i believe is what is causing them to not get deleted. I dont know how these columns get populated.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group