cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Achieving batch-level overwrite for streaming SCD1 in DLT

GuruRio
New Contributor

Hi all,

I am working with Databricks Delta Live Tables (DLT) and have the following scenario:

Setup:

  • Source data is delivered as weekly snapshots (not CDC).

  • I have a bronze layer (streaming table) and a silver layer (also streaming).

  • I am implementing SCD1 logic in the silver layer.

  • Key columns for SCD1 are: CustomerNumber and SalesDate.

Requirement:

  • Sometimes, if there is an issue with the source data, the provider sends the whole weekly snapshot again.

  • In this case, I need to:

    1. Check if rows for the given key combination (CustomerNumber, SalesDate) exist in the silver table.

    2. Delete all existing rows for that key.

    3. Insert all rows from the new batch.

  • Essentially, I want batch-level overwrite per key in a streaming silver table.

Constraints:

  • The solution must use DLT only.

  • I cannot use create_auto_cdc_flow, because it works at row-level SCD, not batch-level snapshots

Question:

  • Can I achieve this using dlt.apply_changes?

  • If yes, how can I configure it so that all rows in a batch are preserved, and the previous batch for the same keys is fully replaced in a streaming table?

Optional Additional Info:

  • My silver table may have multiple rows per key combination in the same batch.

Thanks in advance for any guidance!

2 REPLIES 2

ilir_nuredini
Honored Contributor

Hi @GuruRio ,

Instead of re writing it in here, here is a full article that talks about handling the deletion of the data in DLT: 
https://www.databricks.com/blog/handling-right-be-forgotten-gdpr-and-ccpa-using-delta-live-tables-dl...

Hope that helps,

Best, Ilir

ManojkMohan
Contributor III

One can achieve this with dlt.apply_changes — but you need to configure it carefully to emulate key-based batch overwrite.

Step 1 — Define Bronze as Streaming Source

import dlt
from pyspark.sql.functions import col

@Dlt.table(
comment="Bronze snapshot data"
)
def bronze_customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load("dbfs:/mnt/source/snapshots/")
)

Step 2 —


Use dlt.apply_changes in Silver with apply_as_deletes
Normally, dlt.apply_changes merges rows based on a primary key.

To do batch-level overwrite, you:
Set the primary key as (CustomerNumber).
Provide a sequenceBy column to define recency (e.g., IngestedAt ).

Enable apply_as_deletes="true" so that all rows for a key from an older batch are removed before new rows are inserted. dlt.apply_changes(
target = "silver_customers",
source = "bronze_customers",
keys = ["CustomerNumber", "SalesDate"],
sequence_by = col("IngestedAt"), # ingestion or snapshot timestamp
stored_as_scd_type = "1",
apply_as_deletes = "true"
)  

Why i think this will work:

If multiple rows exist in the same batch for (CustomerNumber), they are all preserved, since apply_changes will delete the old set and insert the new set (it doesn’t deduplicate within a batch).

This mimics batch-level replacement semantics without custom MERGE logic. Let me know if it works , also if helpful pls mark this as an accepted solution

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now