@Bennett Lambertโ :
Yes, it is possible to automatically delete records from the bronze table when a source file is deleted, without doing a full refresh. One way to achieve this is by using the Change Data Capture (CDC) feature in Databricks Delta.
CDC tracks changes to a Delta table and generates a change table that records the insert, update, and delete operations on the table. You can use the CDC table to keep track of the changes to the bronze table and delete the corresponding records when the source file is deleted.
To enable CDC for a Delta table, you need to generate a CDC table using the delta.log API. Here's an example of how to generate a CDC table:
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable
bronze_table_path = "/mnt/s3/source/bronze"
cdc_table_path = "/mnt/s3/source/cdc"
# Enable CDC for bronze table
DeltaTable.forPath(spark, bronze_table_path).generate(deltaPath=cdc_table_path)
# Get CDC table
cdc_table = DeltaTable.forPath(spark, cdc_table_path)
# Delete records from bronze table when source file is deleted
bronze_table = DeltaTable.forPath(spark, bronze_table_path)
bronze_table.alias("b").merge(
cdc_table.alias("c"),
"b.<primary key column> = c.<primary key column> and c.operation = 'D'"
).whenMatchedDelete().execute()
# Update the CDC table with the latest timestamp
cdc_table.alias("c").merge(
bronze_table.alias("b")
).whenMatchedUpdate(set={"timestamp": current_timestamp()}).whenNotMatchedInsert(values={"timestamp": current_timestamp()}).execute()
In this example, we first enable CDC for the bronze table by generating a CDC table. We then get the CDC table and use it to delete the corresponding records from the bronze table when a delete operation is detected. Finally, we update the CDC table with the latest timestamp.
You can run this code as a Databricks Job or as a part of your pipeline to automate the process of generating the CDC table and deleting records from the bronze table.