One can achieve this with dlt.apply_changes — but you need to configure it carefully to emulate key-based batch overwrite.
Step 1 — Define Bronze as Streaming Source
import dlt
from pyspark.sql.functions import col
@Dlt.table(
comment="Bronze snapshot data"
)
def bronze_customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load("dbfs:/mnt/source/snapshots/")
)
Step 2 —
Use dlt.apply_changes in Silver with apply_as_deletes
Normally, dlt.apply_changes merges rows based on a primary key.
To do batch-level overwrite, you:
Set the primary key as (CustomerNumber).
Provide a sequenceBy column to define recency (e.g., IngestedAt ).
Enable apply_as_deletes="true" so that all rows for a key from an older batch are removed before new rows are inserted. dlt.apply_changes(
target = "silver_customers",
source = "bronze_customers",
keys = ["CustomerNumber", "SalesDate"],
sequence_by = col("IngestedAt"), # ingestion or snapshot timestamp
stored_as_scd_type = "1",
apply_as_deletes = "true"
)
Why i think this will work:
If multiple rows exist in the same batch for (CustomerNumber), they are all preserved, since apply_changes will delete the old set and insert the new set (it doesn’t deduplicate within a batch).
This mimics batch-level replacement semantics without custom MERGE logic. Let me know if it works , also if helpful pls mark this as an accepted solution