<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Delta live tables upsert logic without apply changes or autocdc logic in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/delta-live-tables-upsert-logic-without-apply-changes-or-autocdc/m-p/141169#M51644</link>
    <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/183679"&gt;@hidden&lt;/a&gt;&amp;nbsp;,&amp;nbsp;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Creating Streaming Delta Live Tables with Manual Upsert Logic&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;Let’s dig in… this question comes up a lot when folks want upsert behavior in DLT but aren’t using APPLY CHANGES or Auto-CDC. The short version: DLT &lt;STRONG&gt;doesn’t let you drop a &lt;SPAN class="s2"&gt;foreachBatch&lt;/SPAN&gt; into a table declaration&lt;/STRONG&gt;, but you &lt;I&gt;can&lt;/I&gt; get the pattern working by pairing a streaming DLT table with a downstream merge. It’s a bit of a dance, but it’s a well-established one.&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;The Core Pattern: Let DLT Stream, Then Merge Downstream&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p1"&gt;Think of this in a two steps. First, let DLT do what it does best: append new records in a clean, governed way. Then, outside the pipeline, you take those records and run your own Delta merge logic. Here’s the walkthrough.&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 1: Build a Simple Append-Only Streaming Table&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p1"&gt;This is your Bronze layer—just capture what’s landing, keep it tidy, and let DLT manage the plumbing.&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;import dlt
from pyspark.sql.functions import *

@dlt.table(
  comment="Streaming source table",
  table_properties={"quality": "bronze"}
)
def streaming_source():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("s3://your-path/")
  )&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P class="p1"&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 2: Layer in Your Manual Merge Logic&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p3"&gt;Because DLT streaming tables aren’t designed to run &lt;SPAN class="s2"&gt;foreachBatch&lt;/SPAN&gt;, we take the computation downstream. Two patterns work well:&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Option A: Handle the Merge in a Separate Notebook/Job&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p3"&gt;This keeps your DLT pipeline clean while giving you full control over the upsert behavior.&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;from delta.tables import DeltaTable

def upsert_to_target(microBatchDF, batchId):
  microBatchDF.createOrReplaceTempView("updates")
  
  spark.sql("""
    MERGE INTO target_table t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

spark.readStream \
  .table("your_catalog.schema.streaming_source") \
  .writeStream \
  .foreachBatch(upsert_to_target) \
  .outputMode("update") \
  .option("checkpointLocation", "/checkpoint/path") \
  .start()&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P class="p1"&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Option B: Create an Intermediate Streaming Table in DLT, Then Merge Outside the Pipeline&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p3"&gt;This appeals to folks who want the lineage and governance of DLT on both sides of the movement.&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;@dlt.table
def intermediate_stream():
  return dlt.read_stream("streaming_source")

target = DeltaTable.forPath(spark, "/path/to/target")
target.alias("t") \
  .merge(
    spark.read.table("intermediate_stream").alias("s"),
    "t.id = s.id"
  ) \
  .whenMatchedUpdate(set={"col1": "s.col1", "col2": "s.col2"}) \
  .whenNotMatchedInsert(values={"id": "s.id", "col1": "s.col1"}) \
  .execute()&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P class="p1"&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;A Quick Reality Check&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p3"&gt;DLT’s streaming tables are intentionally append-only. They don’t natively support merge semantics because pure streaming engines assume the source is always growing. That’s why APPLY CHANGES exists—it brings CDC semantics to a world that otherwise doesn’t have them.&lt;/P&gt;
&lt;P class="p3"&gt;So, anytime you need classic upsert behavior, your merge step must live just outside the DLT table definition boundary.&lt;/P&gt;
&lt;P class="p3"&gt;Hope this helps, Louis.&lt;/P&gt;</description>
    <pubDate>Thu, 04 Dec 2025 14:40:14 GMT</pubDate>
    <dc:creator>Louis_Frolio</dc:creator>
    <dc:date>2025-12-04T14:40:14Z</dc:date>
    <item>
      <title>Delta live tables upsert logic without apply changes or autocdc logic</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-tables-upsert-logic-without-apply-changes-or-autocdc/m-p/141145#M51631</link>
      <description>&lt;P&gt;i want to create&amp;nbsp; delta live tables which should be streaming&amp;nbsp; and i want to use the manual upsert logic without using the apply changes api or autocdc api . how can i do it&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 04 Dec 2025 11:37:30 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-tables-upsert-logic-without-apply-changes-or-autocdc/m-p/141145#M51631</guid>
      <dc:creator>hidden</dc:creator>
      <dc:date>2025-12-04T11:37:30Z</dc:date>
    </item>
    <item>
      <title>Re: Delta live tables upsert logic without apply changes or autocdc logic</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-tables-upsert-logic-without-apply-changes-or-autocdc/m-p/141169#M51644</link>
      <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/183679"&gt;@hidden&lt;/a&gt;&amp;nbsp;,&amp;nbsp;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Creating Streaming Delta Live Tables with Manual Upsert Logic&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;Let’s dig in… this question comes up a lot when folks want upsert behavior in DLT but aren’t using APPLY CHANGES or Auto-CDC. The short version: DLT &lt;STRONG&gt;doesn’t let you drop a &lt;SPAN class="s2"&gt;foreachBatch&lt;/SPAN&gt; into a table declaration&lt;/STRONG&gt;, but you &lt;I&gt;can&lt;/I&gt; get the pattern working by pairing a streaming DLT table with a downstream merge. It’s a bit of a dance, but it’s a well-established one.&lt;/P&gt;
&lt;P class="p2"&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;The Core Pattern: Let DLT Stream, Then Merge Downstream&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p1"&gt;Think of this in a two steps. First, let DLT do what it does best: append new records in a clean, governed way. Then, outside the pipeline, you take those records and run your own Delta merge logic. Here’s the walkthrough.&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 1: Build a Simple Append-Only Streaming Table&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p1"&gt;This is your Bronze layer—just capture what’s landing, keep it tidy, and let DLT manage the plumbing.&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;import dlt
from pyspark.sql.functions import *

@dlt.table(
  comment="Streaming source table",
  table_properties={"quality": "bronze"}
)
def streaming_source():
  return (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("s3://your-path/")
  )&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P class="p1"&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 2: Layer in Your Manual Merge Logic&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p3"&gt;Because DLT streaming tables aren’t designed to run &lt;SPAN class="s2"&gt;foreachBatch&lt;/SPAN&gt;, we take the computation downstream. Two patterns work well:&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Option A: Handle the Merge in a Separate Notebook/Job&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p3"&gt;This keeps your DLT pipeline clean while giving you full control over the upsert behavior.&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;from delta.tables import DeltaTable

def upsert_to_target(microBatchDF, batchId):
  microBatchDF.createOrReplaceTempView("updates")
  
  spark.sql("""
    MERGE INTO target_table t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

spark.readStream \
  .table("your_catalog.schema.streaming_source") \
  .writeStream \
  .foreachBatch(upsert_to_target) \
  .outputMode("update") \
  .option("checkpointLocation", "/checkpoint/path") \
  .start()&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P class="p1"&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Option B: Create an Intermediate Streaming Table in DLT, Then Merge Outside the Pipeline&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p3"&gt;This appeals to folks who want the lineage and governance of DLT on both sides of the movement.&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;@dlt.table
def intermediate_stream():
  return dlt.read_stream("streaming_source")

target = DeltaTable.forPath(spark, "/path/to/target")
target.alias("t") \
  .merge(
    spark.read.table("intermediate_stream").alias("s"),
    "t.id = s.id"
  ) \
  .whenMatchedUpdate(set={"col1": "s.col1", "col2": "s.col2"}) \
  .whenNotMatchedInsert(values={"id": "s.id", "col1": "s.col1"}) \
  .execute()&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P class="p1"&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;A Quick Reality Check&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P class="p3"&gt;DLT’s streaming tables are intentionally append-only. They don’t natively support merge semantics because pure streaming engines assume the source is always growing. That’s why APPLY CHANGES exists—it brings CDC semantics to a world that otherwise doesn’t have them.&lt;/P&gt;
&lt;P class="p3"&gt;So, anytime you need classic upsert behavior, your merge step must live just outside the DLT table definition boundary.&lt;/P&gt;
&lt;P class="p3"&gt;Hope this helps, Louis.&lt;/P&gt;</description>
      <pubDate>Thu, 04 Dec 2025 14:40:14 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-tables-upsert-logic-without-apply-changes-or-autocdc/m-p/141169#M51644</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-12-04T14:40:14Z</dc:date>
    </item>
  </channel>
</rss>

