cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta live tables upsert logic without apply changes or autocdc logic

hidden
New Contributor II

i want to create  delta live tables which should be streaming  and i want to use the manual upsert logic without using the apply changes api or autocdc api . how can i do it 

 

1 REPLY 1

Louis_Frolio
Databricks Employee
Databricks Employee

Hello @hidden , 

Creating Streaming Delta Live Tables with Manual Upsert Logic

 

Letโ€™s dig inโ€ฆ this question comes up a lot when folks want upsert behavior in DLT but arenโ€™t using APPLY CHANGES or Auto-CDC. The short version: DLT doesnโ€™t let you drop a foreachBatch into a table declaration, but you can get the pattern working by pairing a streaming DLT table with a downstream merge. Itโ€™s a bit of a dance, but itโ€™s a well-established one.

 

The Core Pattern: Let DLT Stream, Then Merge Downstream

Think of this in a two steps. First, let DLT do what it does best: append new records in a clean, governed way. Then, outside the pipeline, you take those records and run your own Delta merge logic. Hereโ€™s the walkthrough.

Step 1: Build a Simple Append-Only Streaming Table

This is your Bronze layerโ€”just capture whatโ€™s landing, keep it tidy, and let DLT manage the plumbing.

import dlt
from pyspark.sql.functions import *

@dlt.table(
  comment="Streaming source table",
  table_properties={"quality": "bronze"}
)
def streaming_source():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("s3://your-path/")
  )

 

Step 2: Layer in Your Manual Merge Logic

Because DLT streaming tables arenโ€™t designed to run foreachBatch, we take the computation downstream. Two patterns work well:

Option A: Handle the Merge in a Separate Notebook/Job

This keeps your DLT pipeline clean while giving you full control over the upsert behavior.

from delta.tables import DeltaTable

def upsert_to_target(microBatchDF, batchId):
  microBatchDF.createOrReplaceTempView("updates")
  
  spark.sql("""
    MERGE INTO target_table t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

spark.readStream \
  .table("your_catalog.schema.streaming_source") \
  .writeStream \
  .foreachBatch(upsert_to_target) \
  .outputMode("update") \
  .option("checkpointLocation", "/checkpoint/path") \
  .start()

 

Option B: Create an Intermediate Streaming Table in DLT, Then Merge Outside the Pipeline

This appeals to folks who want the lineage and governance of DLT on both sides of the movement.

@dlt.table
def intermediate_stream():
  return dlt.read_stream("streaming_source")

target = DeltaTable.forPath(spark, "/path/to/target")
target.alias("t") \
  .merge(
    spark.read.table("intermediate_stream").alias("s"),
    "t.id = s.id"
  ) \
  .whenMatchedUpdate(set={"col1": "s.col1", "col2": "s.col2"}) \
  .whenNotMatchedInsert(values={"id": "s.id", "col1": "s.col1"}) \
  .execute()

 

A Quick Reality Check

DLTโ€™s streaming tables are intentionally append-only. They donโ€™t natively support merge semantics because pure streaming engines assume the source is always growing. Thatโ€™s why APPLY CHANGES existsโ€”it brings CDC semantics to a world that otherwise doesnโ€™t have them.

So, anytime you need classic upsert behavior, your merge step must live just outside the DLT table definition boundary.

Hope this helps, Louis.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now