cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Rollbacks/deletes on streaming table

Rainier_dw
New Contributor III

Hi all — I’m running a Medallion streaming pipeline on Databricks using DLT (bronze → staging silver view → silver table). I ran into an issue and would appreciate any advice or best practices.

What I’m doing

  • Ingesting streaming data into a streaming bronze table.

  • Creating a staging silver view that applies transformations.

  • Writing to a streaming silver table.

The problem
When I delete a record in the bronze table, it breaks downstream because deletes aren’t supported in my streaming target. To work around this I enabled options to ignore deletes / change commits, which prevents the error — but now the pipeline no longer picks up from the previous checkpoint. Instead it re-ingests all data into the silver layer (effectively a full reprocessing).

I know a full refresh fixes the checkpoint mismatch, but that’s not acceptable for us: we have very large volumes and upstream cdl_ingested_date that would be refreshed on a full reload, so a full refresh undesirable.

Questions

  1. What is the recommended/best-practice approach for handling deletes in a streaming Medallion pipeline on Databricks (DLT)?

  2. Is there a safe way to ignore deletes without forcing a full re-ingest / losing the previous checkpoint?

  3. If rolling back or recovering streaming tables, what are considered best practise?

1 ACCEPTED SOLUTION

Accepted Solutions

Thanks for the response. CDC was applied upstream before the error; this problem happens when rolling back a streaming table.

I've implemented a read-change-feed in the staging silver view and so far it's returning the expected results. I'm filtering for inserts only:

bronze_df = spark.readStream.format("delta").option("readChangeFeed", "true").table(bronze_uc_name)
bronze_df_inserts_only = bronze_df.filter(col("_change_type") == "insert")

I still need to run more tests to make sure it consistently identifies the correct changes across edge cases. I'll post an update once I've validated it in broader testing.

View solution in original post

6 REPLIES 6

BS_THE_ANALYST
Honored Contributor III

Hey @Rainier_dw, have you read up much on CDC? https://www.databricks.com/blog/2022/04/25/simplifying-change-data-capture-with-databricks-delta-liv... . I wonder if this is an appropriate fit.

I'm currently going through the Data Engineering certification and have stumbled across the MERGE INTO command. It's also referenced in the article above. That provides a really great functionality for "when [not] matched then {delete|insert|update}: https://www.databricks.com/blog/2019/03/19/efficient-upserts-into-data-lakes-databricks-delta.html .. perhaps this is a good angle to leverage this. 

I guess this doesn't address your 3rd question, I'm also interested in that one. Have you tried using the Time Travel functionality with Delta Tables? How have you found that to behave with Streaming Tables?

All the best,
BS

Thanks for the response. CDC was applied upstream before the error; this problem happens when rolling back a streaming table.

I've implemented a read-change-feed in the staging silver view and so far it's returning the expected results. I'm filtering for inserts only:

bronze_df = spark.readStream.format("delta").option("readChangeFeed", "true").table(bronze_uc_name)
bronze_df_inserts_only = bronze_df.filter(col("_change_type") == "insert")

I still need to run more tests to make sure it consistently identifies the correct changes across edge cases. I'll post an update once I've validated it in broader testing.

dalcuovidiu
New Contributor II

I'm not entirely sure if I’m missing something here, but as far as I know there’s a golden rule in DWH applications: you never hard delete records, you use soft deletes instead. So I’m a bit puzzled why a hard delete is being used in this case.

szymon_dybczak
Esteemed Contributor III

Sometimes client needs that. And for certain business domains there is a regulatory law that makes you implement hard deletes šŸ™‚

Yes, I know. I also had a request in the past to delete records older than 5 years due to GDPR rules, so we created a procedure that automatically deletes them based on a threshold. Especially in Europe, this is a common issue. On-prem was pretty easy, because we used batch mode, of course, but I’m not sure how this could be applied to streaming tables. It’s clear a case to follow, at least for me:) 

In this case, we are working with a major retail client that is migrating their systems from on-premises to Azure Databricks. Some of their tables will continue pulling from their original on-premises systems while we investigate all the SQL Server stored procedures they have in place. Additionally, some tables will be file-ingested. Since some files might be faulty, the ability to delete/roll back ingested data is a critical requirement for the migration process.