nayan_wylde
Esteemed Contributor II
Use APPLY CHANGES INTO (SQL) or dlt.apply_changes() (Python). This is the declarative replacement for foreachBatch MERGE logic in pipelines

 

import dlt
from pyspark.sql.functions import col

@dlt.table(name="bronze_events")
def bronze_events():
    return (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .load("abfss://container@account.dfs.core.windows.net/events/"))

dlt.apply_changes(
    target="customer",
    source="bronze_events",
    keys=["customer_id"],
    sequence_by=col("event_ts"),
    apply_as_deletes=col("op") == "DELETE",
    stored_as_scd_type=1
)