cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to deal with deleted files in source directory in DLT?

BenLambert
Contributor

We have a DLT pipeline that uses the autoloader to detect files added to a source storage bucket. It reads these updated files and adds new records to a bronze streaming table. However we would also like to automatically delete records from the bronze table when a source file is deleted. Is there a way to accomplish this without a full refresh? I've read about the new change data capture feature of DLT, but I'm not exactly clear on how to generate the required CDC table in an automated way.

1 REPLY 1

Anonymous
Not applicable

@Bennett Lambertโ€‹ :

Yes, it is possible to automatically delete records from the bronze table when a source file is deleted, without doing a full refresh. One way to achieve this is by using the Change Data Capture (CDC) feature in Databricks Delta.

CDC tracks changes to a Delta table and generates a change table that records the insert, update, and delete operations on the table. You can use the CDC table to keep track of the changes to the bronze table and delete the corresponding records when the source file is deleted.

To enable CDC for a Delta table, you need to generate a CDC table using the delta.log API. Here's an example of how to generate a CDC table:

from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable
 
bronze_table_path = "/mnt/s3/source/bronze"
cdc_table_path = "/mnt/s3/source/cdc"
 
# Enable CDC for bronze table
DeltaTable.forPath(spark, bronze_table_path).generate(deltaPath=cdc_table_path)
 
# Get CDC table
cdc_table = DeltaTable.forPath(spark, cdc_table_path)
 
# Delete records from bronze table when source file is deleted
bronze_table = DeltaTable.forPath(spark, bronze_table_path)
bronze_table.alias("b").merge(
    cdc_table.alias("c"),
    "b.<primary key column> = c.<primary key column> and c.operation = 'D'"
).whenMatchedDelete().execute()
 
# Update the CDC table with the latest timestamp
cdc_table.alias("c").merge(
    bronze_table.alias("b")
).whenMatchedUpdate(set={"timestamp": current_timestamp()}).whenNotMatchedInsert(values={"timestamp": current_timestamp()}).execute()

In this example, we first enable CDC for the bronze table by generating a CDC table. We then get the CDC table and use it to delete the corresponding records from the bronze table when a delete operation is detected. Finally, we update the CDC table with the latest timestamp.

You can run this code as a Databricks Job or as a part of your pipeline to automate the process of generating the CDC table and deleting records from the bronze table.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group