a week ago
Hi all ā Iām running a Medallion streaming pipeline on Databricks using DLT (bronze ā staging silver view ā silver table). I ran into an issue and would appreciate any advice or best practices.
What Iām doing
Ingesting streaming data into a streaming bronze table.
Creating a staging silver view that applies transformations.
Writing to a streaming silver table.
The problem
When I delete a record in the bronze table, it breaks downstream because deletes arenāt supported in my streaming target. To work around this I enabled options to ignore deletes / change commits, which prevents the error ā but now the pipeline no longer picks up from the previous checkpoint. Instead it re-ingests all data into the silver layer (effectively a full reprocessing).
I know a full refresh fixes the checkpoint mismatch, but thatās not acceptable for us: we have very large volumes and upstream cdl_ingested_date that would be refreshed on a full reload, so a full refresh undesirable.
Questions
What is the recommended/best-practice approach for handling deletes in a streaming Medallion pipeline on Databricks (DLT)?
Is there a safe way to ignore deletes without forcing a full re-ingest / losing the previous checkpoint?
If rolling back or recovering streaming tables, what are considered best practise?
a week ago
Thanks for the response. CDC was applied upstream before the error; this problem happens when rolling back a streaming table.
I've implemented a read-change-feed in the staging silver view and so far it's returning the expected results. I'm filtering for inserts only:
bronze_df = spark.readStream.format("delta").option("readChangeFeed", "true").table(bronze_uc_name)
bronze_df_inserts_only = bronze_df.filter(col("_change_type") == "insert")
I still need to run more tests to make sure it consistently identifies the correct changes across edge cases. I'll post an update once I've validated it in broader testing.
a week ago - last edited a week ago
Hey @Rainier_dw, have you read up much on CDC? https://www.databricks.com/blog/2022/04/25/simplifying-change-data-capture-with-databricks-delta-liv... . I wonder if this is an appropriate fit.
I'm currently going through the Data Engineering certification and have stumbled across the MERGE INTO command. It's also referenced in the article above. That provides a really great functionality for "when [not] matched then {delete|insert|update}: https://www.databricks.com/blog/2019/03/19/efficient-upserts-into-data-lakes-databricks-delta.html .. perhaps this is a good angle to leverage this.
I guess this doesn't address your 3rd question, I'm also interested in that one. Have you tried using the Time Travel functionality with Delta Tables? How have you found that to behave with Streaming Tables?
All the best,
BS
a week ago
Thanks for the response. CDC was applied upstream before the error; this problem happens when rolling back a streaming table.
I've implemented a read-change-feed in the staging silver view and so far it's returning the expected results. I'm filtering for inserts only:
bronze_df = spark.readStream.format("delta").option("readChangeFeed", "true").table(bronze_uc_name)
bronze_df_inserts_only = bronze_df.filter(col("_change_type") == "insert")
I still need to run more tests to make sure it consistently identifies the correct changes across edge cases. I'll post an update once I've validated it in broader testing.
a week ago
I'm not entirely sure if Iām missing something here, but as far as I know thereās a golden rule in DWH applications: you never hard delete records, you use soft deletes instead. So Iām a bit puzzled why a hard delete is being used in this case.
a week ago
Sometimes client needs that. And for certain business domains there is a regulatory law that makes you implement hard deletes š
a week ago
Yes, I know. I also had a request in the past to delete records older than 5 years due to GDPR rules, so we created a procedure that automatically deletes them based on a threshold. Especially in Europe, this is a common issue. On-prem was pretty easy, because we used batch mode, of course, but Iām not sure how this could be applied to streaming tables. Itās clear a case to follow, at least for me:)
a week ago
In this case, we are working with a major retail client that is migrating their systems from on-premises to Azure Databricks. Some of their tables will continue pulling from their original on-premises systems while we investigate all the SQL Server stored procedures they have in place. Additionally, some tables will be file-ingested. Since some files might be faulty, the ability to delete/roll back ingested data is a critical requirement for the migration process.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityāsign up today to get started!
Sign Up Now