cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to deal with deleted files in source directory in DLT?

BenLambert
Contributor

We have a DLT pipeline that uses the autoloader to detect files added to a source storage bucket. It reads these updated files and adds new records to a bronze streaming table. However we would also like to automatically delete records from the bronze table when a source file is deleted. Is there a way to accomplish this without a full refresh? I've read about the new change data capture feature of DLT, but I'm not exactly clear on how to generate the required CDC table in an automated way.

1 REPLY 1

Anonymous
Not applicable

@Bennett Lambert​ :

Yes, it is possible to automatically delete records from the bronze table when a source file is deleted, without doing a full refresh. One way to achieve this is by using the Change Data Capture (CDC) feature in Databricks Delta.

CDC tracks changes to a Delta table and generates a change table that records the insert, update, and delete operations on the table. You can use the CDC table to keep track of the changes to the bronze table and delete the corresponding records when the source file is deleted.

To enable CDC for a Delta table, you need to generate a CDC table using the delta.log API. Here's an example of how to generate a CDC table:

from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable
 
bronze_table_path = "/mnt/s3/source/bronze"
cdc_table_path = "/mnt/s3/source/cdc"
 
# Enable CDC for bronze table
DeltaTable.forPath(spark, bronze_table_path).generate(deltaPath=cdc_table_path)
 
# Get CDC table
cdc_table = DeltaTable.forPath(spark, cdc_table_path)
 
# Delete records from bronze table when source file is deleted
bronze_table = DeltaTable.forPath(spark, bronze_table_path)
bronze_table.alias("b").merge(
    cdc_table.alias("c"),
    "b.<primary key column> = c.<primary key column> and c.operation = 'D'"
).whenMatchedDelete().execute()
 
# Update the CDC table with the latest timestamp
cdc_table.alias("c").merge(
    bronze_table.alias("b")
).whenMatchedUpdate(set={"timestamp": current_timestamp()}).whenNotMatchedInsert(values={"timestamp": current_timestamp()}).execute()

In this example, we first enable CDC for the bronze table by generating a CDC table. We then get the CDC table and use it to delete the corresponding records from the bronze table when a delete operation is detected. Finally, we update the CDC table with the latest timestamp.

You can run this code as a Databricks Job or as a part of your pipeline to automate the process of generating the CDC table and deleting records from the bronze table.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.