Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-15-2026 12:17 PM
Use APPLY CHANGES INTO (SQL) or dlt.apply_changes() (Python). This is the declarative replacement for foreachBatch MERGE logic in pipelines
import dlt
from pyspark.sql.functions import col
@dlt.table(name="bronze_events")
def bronze_events():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://container@account.dfs.core.windows.net/events/"))
dlt.apply_changes(
target="customer",
source="bronze_events",
keys=["customer_id"],
sequence_by=col("event_ts"),
apply_as_deletes=col("op") == "DELETE",
stored_as_scd_type=1
)