cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Recommended approach for handling deletes in a Delta table

WiliamRosa
New Contributor III

What is the recommended approach for handling deletes in a Delta table?
I have a table in MySQL (no soft delete flag) that I read and write into Azure as a Delta table. My current flow is:
- If an ID exists in both MySQL and the Delta table โ†’ update the record in Delta.
- If an ID exists in MySQL but not in Delta โ†’ insert it into Delta.
The challenge is: how do I handle deletes? Specifically, if an ID exists in the Delta table but does not exist in the latest MySQL extract, I want to remove it from Delta. Whatโ€™s the best way to implement this logic?

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa
1 ACCEPTED SOLUTION

Accepted Solutions

nayan_wylde
Honored Contributor II

The recommended way of handling CDC in Databricks is by using the merge command.

https://docs.databricks.com/aws/en/sql/language-manual/delta-merge-into

If you using SQL.

-- Delete all target rows that have a match in the source table.
> MERGE INTO target USING source
ON target.key = source.key
WHEN MATCHED THEN DELETE

-- Conditionally update target rows that have a match in the source table using the source value.
> MERGE INTO target USING source
ON target.key = source.key
WHEN MATCHED AND target.updated_at < source.updated_at THEN UPDATE SET *

-- Multiple MATCHED clauses conditionally deleting matched target rows and updating two columns for all other matched rows.
> MERGE INTO target USING source
ON target.key = source.key
WHEN MATCHED AND target.marked_for_deletion THEN DELETE
WHEN MATCHED THEN UPDATE SET target.updated_at = source.updated_at, target.value = DEFAULT

 

-- if you are using python,

from delta.tables import *

# Assuming 'targetTable' is a DeltaTable object and 'sourceDF' is a DataFrame
# representing the data to be merged.

deltaTable = DeltaTable.forPath(spark, "/path/to/your/delta/table")

deltaTable.merge(
sourceDF,
"target.key_column = source.key_column" 
) .whenMatchedUpdateAll() .whenNotMatchedInsertAll().whenNotMatchedBySourceDelete().execute()

View solution in original post

1 REPLY 1

nayan_wylde
Honored Contributor II

The recommended way of handling CDC in Databricks is by using the merge command.

https://docs.databricks.com/aws/en/sql/language-manual/delta-merge-into

If you using SQL.

-- Delete all target rows that have a match in the source table.
> MERGE INTO target USING source
ON target.key = source.key
WHEN MATCHED THEN DELETE

-- Conditionally update target rows that have a match in the source table using the source value.
> MERGE INTO target USING source
ON target.key = source.key
WHEN MATCHED AND target.updated_at < source.updated_at THEN UPDATE SET *

-- Multiple MATCHED clauses conditionally deleting matched target rows and updating two columns for all other matched rows.
> MERGE INTO target USING source
ON target.key = source.key
WHEN MATCHED AND target.marked_for_deletion THEN DELETE
WHEN MATCHED THEN UPDATE SET target.updated_at = source.updated_at, target.value = DEFAULT

 

-- if you are using python,

from delta.tables import *

# Assuming 'targetTable' is a DeltaTable object and 'sourceDF' is a DataFrame
# representing the data to be merged.

deltaTable = DeltaTable.forPath(spark, "/path/to/your/delta/table")

deltaTable.merge(
sourceDF,
"target.key_column = source.key_column" 
) .whenMatchedUpdateAll() .whenNotMatchedInsertAll().whenNotMatchedBySourceDelete().execute()