Hello @hidden ,
Creating Streaming Delta Live Tables with Manual Upsert Logic
Letโs dig inโฆ this question comes up a lot when folks want upsert behavior in DLT but arenโt using APPLY CHANGES or Auto-CDC. The short version: DLT doesnโt let you drop a foreachBatch into a table declaration, but you can get the pattern working by pairing a streaming DLT table with a downstream merge. Itโs a bit of a dance, but itโs a well-established one.
The Core Pattern: Let DLT Stream, Then Merge Downstream
Think of this in a two steps. First, let DLT do what it does best: append new records in a clean, governed way. Then, outside the pipeline, you take those records and run your own Delta merge logic. Hereโs the walkthrough.
Step 1: Build a Simple Append-Only Streaming Table
This is your Bronze layerโjust capture whatโs landing, keep it tidy, and let DLT manage the plumbing.
import dlt
from pyspark.sql.functions import *
@dlt.table(
comment="Streaming source table",
table_properties={"quality": "bronze"}
)
def streaming_source():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://your-path/")
)
Step 2: Layer in Your Manual Merge Logic
Because DLT streaming tables arenโt designed to run foreachBatch, we take the computation downstream. Two patterns work well:
Option A: Handle the Merge in a Separate Notebook/Job
This keeps your DLT pipeline clean while giving you full control over the upsert behavior.
from delta.tables import DeltaTable
def upsert_to_target(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
spark.sql("""
MERGE INTO target_table t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
spark.readStream \
.table("your_catalog.schema.streaming_source") \
.writeStream \
.foreachBatch(upsert_to_target) \
.outputMode("update") \
.option("checkpointLocation", "/checkpoint/path") \
.start()
Option B: Create an Intermediate Streaming Table in DLT, Then Merge Outside the Pipeline
This appeals to folks who want the lineage and governance of DLT on both sides of the movement.
@dlt.table
def intermediate_stream():
return dlt.read_stream("streaming_source")
target = DeltaTable.forPath(spark, "/path/to/target")
target.alias("t") \
.merge(
spark.read.table("intermediate_stream").alias("s"),
"t.id = s.id"
) \
.whenMatchedUpdate(set={"col1": "s.col1", "col2": "s.col2"}) \
.whenNotMatchedInsert(values={"id": "s.id", "col1": "s.col1"}) \
.execute()
A Quick Reality Check
DLTโs streaming tables are intentionally append-only. They donโt natively support merge semantics because pure streaming engines assume the source is always growing. Thatโs why APPLY CHANGES existsโit brings CDC semantics to a world that otherwise doesnโt have them.
So, anytime you need classic upsert behavior, your merge step must live just outside the DLT table definition boundary.
Hope this helps, Louis.