cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Issue while handling Deletes and Inserts in Structured Streaming

bricks_2026
New Contributor II

Hello

We have a framework which reads the CDF logs from the source table and then merges to the target table. 

The logic is implemented in such a way that( if there are multiple commit_versions in the source table), a window function is applied to identify the last operation .

The last operation is then used and replicated to the target.

This works as long as Inserts and Updates Operations are done.

In a particular scenario, in order to do an update of a record, the row is first deleted(COMMIT_TYPE=Delete) and then inserted(COMMIT_TYPE=Insert) with changes in the data. Our framework logic is not able to handle this scenario , as it selects only the latest commit_type= INSERT and does not consider the delete commit_type. As this record is already available in the target, therefore the logic does not do anything and completely ignores the record. Due to this reason,  we are missing the Updates in the Target Table. Can you suggest ways to fix this issue ? 

Thanks in advance for your support. 

Best Regards

1 ACCEPTED SOLUTION

Accepted Solutions

@bricks_2026 ,

Lakeflow Spark Declarative Pipelines AUTO CDC runs in exactly the same way as a classic SQL/Python pipeline, just with a different runtime version. It can even run on a non-serverless compute if necessary. You can just go ahead and create a Python notebook with the AUTO CDC flow and at least try it out.  

What are the reasons you cannot use it yet? It would be just so much easier to accomplish your task with AUTO CDC.

Otherwise, with the manual approach, you would need to compare not only the key, but every column in the MERGE statement to update with the new values from an insert operation, something like this:

 MERGE INTO cdc_data_raw t
   USING updates s
     ON s.key = t.key
   -- ... delete merge logic ...
   WHEN MATCHED THEN UPDATE SET 
     A = CASE WHEN s.ts > t.ts AND s.a != t.a THEN s.a ELSE t.a END,
     B = CASE WHEN s.ts > t.ts AND s.b != t.b THEN s.b ELSE t.b END,
            -- ... for every column ...
     ts = CASE WHEN s.ts > t.ts THEN s.ts ELSE t.ts END
 WHEN NOT MATCHED THEN INSERT *

Hope it helps.

View solution in original post

4 REPLIES 4

Sumit_7
Honored Contributor

The issue arises because your framework collapses multiple CDF events using a window and only retains the latest commit_version, which breaks update semantics.

In Delta CDF, an update is represented as a delete + insert pair, not a single event. By selecting only the latest insert, you lose the preceding delete, causing the MERGE to skip updates when the record already exists. The correct approach is to process CDF events without collapsing them prematurely. Use _change_type explicitly in your MERGE logic to handle delete, insert, and update_postimage correctly. Alternatively, leverage update_postimage records instead of inferring updates manually. If windowing is unavoidable, detect delete → insert patterns and treat them as updates. A more robust solution is to use Databricks’ APPLY CHANGES INTO, which natively handles ordering and update semantics. Ensure your pipeline preserves event order via _commit_version. Avoid relying solely on “latest state = truth” in event-driven systems. This adjustment will correctly propagate updates to the target table.

Thanks & Regards,

aleksandra_ch
Databricks Employee
Databricks Employee

Hi @bricks_2026 ,

I recommend you to consider moving to AUTO CDC which handles the merge and window logic of CDF flow automatically. You need SCD Type 1 to get the last operation only. Check out these docs:

Best regards,

Hi  @aleksandra_ch 

 

Many thanks for your answer.

I had a quick look at the links. It seems that they apply to Lakeflow Spark Declarative Pipelines only. Our Cloud Admin Team does not support Declarative Pipelines, so we cannot use them yet.

Regarding the need to collapse the input or source DataFrame (CDF Log): we had to collapse it and use only the last commit version or commit timestamp. Without this, we get the following error:

[DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] Cannot perform Merge as multiple source rows matched and attempted to modify the same

For the same primary key, we would otherwise try to process more than one record in a single MERGE, which is not allowed-

https://docs.databricks.com/aws/en/error-messages/error-classes#delta_multiple_source_row_matching_t...

@bricks_2026 ,

Lakeflow Spark Declarative Pipelines AUTO CDC runs in exactly the same way as a classic SQL/Python pipeline, just with a different runtime version. It can even run on a non-serverless compute if necessary. You can just go ahead and create a Python notebook with the AUTO CDC flow and at least try it out.  

What are the reasons you cannot use it yet? It would be just so much easier to accomplish your task with AUTO CDC.

Otherwise, with the manual approach, you would need to compare not only the key, but every column in the MERGE statement to update with the new values from an insert operation, something like this:

 MERGE INTO cdc_data_raw t
   USING updates s
     ON s.key = t.key
   -- ... delete merge logic ...
   WHEN MATCHED THEN UPDATE SET 
     A = CASE WHEN s.ts > t.ts AND s.a != t.a THEN s.a ELSE t.a END,
     B = CASE WHEN s.ts > t.ts AND s.b != t.b THEN s.b ELSE t.b END,
            -- ... for every column ...
     ts = CASE WHEN s.ts > t.ts THEN s.ts ELSE t.ts END
 WHEN NOT MATCHED THEN INSERT *

Hope it helps.