<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Updating a Delta Table in Delta Live Tables (DLT) from Two Event Hubs in Get Started Discussions</title>
    <link>https://community.databricks.com/t5/get-started-discussions/updating-a-delta-table-in-delta-live-tables-dlt-from-two-event/m-p/139234#M11029</link>
    <description>&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;To achieve robust, persistent CDC (Change Data Capture)–style updates in Databricks DLT with your scenario—&lt;STRONG&gt;while keeping data_predictions as a Delta Table (not a Materialized View)&lt;/STRONG&gt;—you need to carefully avoid streaming joins and side effects across streaming tables in a single DLT pipeline, because of DLT’s constraints on table references and state handling.&lt;/P&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Below is a best-practice architecture and pipeline pattern for your use case:&lt;/P&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Key Solution Principles&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Ingest Predictor and Confirmer streams each to their own staging Delta Tables&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(no materialized views or joins here).&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Make data_predictions a Delta Table&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;with historical CDC semantics (append-only with late-arriving data).&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Perform updates via&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;MERGE INTO&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(upserts) using a batch job or a separate DLT pipeline&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;instead of streaming/SQL views, as true updates on existing Delta Tables from multiple sources in a single DLT pipeline are not well supported.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Retain historical data&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;by&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;enabling Delta Lake time travel&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;and&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;mergeSchema&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;options, so even after pipeline restarts, your table’s history is intact and queryable.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Step-by-Step Pipeline Design&lt;/H2&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;1. Stage Each Stream in Its Own Raw Table&lt;/H2&gt;
&lt;DIV class="w-full md:max-w-[90vw]"&gt;
&lt;DIV class="codeWrapper text-light selection:text-super selection:bg-super/10 my-md relative flex flex-col rounded-lg font-mono text-sm font-normal bg-subtler"&gt;
&lt;DIV class="translate-y-xs -translate-x-xs bottom-xl mb-xl flex h-0 items-start justify-end md:sticky md:top-[calc(var(--header-height)+var(--size-xs))]"&gt;
&lt;DIV class="overflow-hidden rounded-full border-subtlest ring-subtlest divide-subtlest bg-base"&gt;
&lt;DIV class="border-subtlest ring-subtlest divide-subtlest bg-subtler"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV class="-mt-xl"&gt;
&lt;DIV&gt;
&lt;DIV class="text-quiet bg-subtle py-xs px-sm inline-block rounded-br rounded-tl-lg text-xs font-thin" data-testid="code-language-indicator"&gt;python&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;&lt;CODE&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;&lt;/SPAN&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;table&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;name&lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"raw_predictor"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token"&gt;def&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;raw_predictor&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;:&lt;/SPAN&gt;
    df &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; &lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
        spark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;readStream
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;format&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"kafka"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;options&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token operator"&gt;**&lt;/SPAN&gt;KAFKA_OPTIONS_PREDICTOR&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;load&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;selectExpr&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"CAST(value AS STRING) as json_data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;select&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;from_json&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;col&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"json_data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; schema&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token"&gt;# flatten, select, etc...&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"etl_ingest_ts"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; current_timestamp&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token"&gt;return&lt;/SPAN&gt; df

&lt;SPAN class="token token decorator annotation punctuation"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;&lt;/SPAN&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;table&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;name&lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"raw_confirmer"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token"&gt;def&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;raw_confirmer&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;:&lt;/SPAN&gt;
    df &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; &lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
        spark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;readStream
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;format&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"kafka"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;options&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token operator"&gt;**&lt;/SPAN&gt;KAFKA_OPTIONS_CONFIRMER&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;load&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;selectExpr&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"CAST(value AS STRING) as json_data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;select&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;from_json&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;col&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"json_data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; schema_update&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token"&gt;# flatten, select, etc...&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"confirmer_etl_ts"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; current_timestamp&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token"&gt;return&lt;/SPAN&gt; df
&lt;/CODE&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;BLOCKQUOTE&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;These are&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;persisted, append-only Delta tables&lt;/STRONG&gt;—not materialized views.&lt;/P&gt;
&lt;/BLOCKQUOTE&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;2. Build/Persist the Gold Table (&lt;CODE&gt;data_predictions&lt;/CODE&gt;) via a Batch Upsert&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Create a separate batch pipeline or notebook&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;that periodically runs a&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;MERGE INTO&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;operation to keep&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;data_predictions&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;current with the latest from both staging tables, using a&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;uuid&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;key.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Example batch upsert job:&lt;/P&gt;
&lt;DIV class="w-full md:max-w-[90vw]"&gt;
&lt;DIV class="codeWrapper text-light selection:text-super selection:bg-super/10 my-md relative flex flex-col rounded-lg font-mono text-sm font-normal bg-subtler"&gt;
&lt;DIV class="translate-y-xs -translate-x-xs bottom-xl mb-xl flex h-0 items-start justify-end md:sticky md:top-[calc(var(--header-height)+var(--size-xs))]"&gt;
&lt;DIV class="overflow-hidden rounded-full border-subtlest ring-subtlest divide-subtlest bg-base"&gt;
&lt;DIV class="border-subtlest ring-subtlest divide-subtlest bg-subtler"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV class="-mt-xl"&gt;
&lt;DIV&gt;
&lt;DIV class="text-quiet bg-subtle py-xs px-sm inline-block rounded-br rounded-tl-lg text-xs font-thin" data-testid="code-language-indicator"&gt;python&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;&lt;CODE&gt;&lt;SPAN class="token token"&gt;# in a notebook or DLT batch pipeline cell&lt;/SPAN&gt;

&lt;SPAN class="token token"&gt;# Load tables&lt;/SPAN&gt;
df_predictor &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; spark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;read&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;table&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"raw_predictor"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
df_confirmer &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; spark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;read&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;table&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"raw_confirmer"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;

&lt;SPAN class="token token"&gt;# Prepare the upsert source by joining the latest confirmer data&lt;/SPAN&gt;
&lt;SPAN class="token token"&gt;from&lt;/SPAN&gt; pyspark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;sql &lt;SPAN class="token token"&gt;import&lt;/SPAN&gt; functions &lt;SPAN class="token token"&gt;as&lt;/SPAN&gt; F

df_upsert &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; df_predictor&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;join&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
    df_confirmer&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    on&lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"uuid"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    how&lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"left"&lt;/SPAN&gt;
&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;select&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
    &lt;SPAN class="token token"&gt;"uuid"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    &lt;SPAN class="token token"&gt;"predictions"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    F&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;coalesce&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code1"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; df_predictor&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code1"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code1"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    F&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;coalesce&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code2"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; df_predictor&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code2"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code2"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    df_predictor&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"etl_ingest_ts"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    F&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;when&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code1"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;isNotNull&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt; &lt;SPAN class="token token operator"&gt;|&lt;/SPAN&gt; df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code2"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;isNotNull&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"confirmer_etl_ts"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;otherwise&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df_predictor&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"timestamp_update"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"timestamp_update"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;

&lt;SPAN class="token token"&gt;# Merge (upsert) into persistent Delta Table&lt;/SPAN&gt;
&lt;SPAN class="token token"&gt;from&lt;/SPAN&gt; delta&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;tables &lt;SPAN class="token token"&gt;import&lt;/SPAN&gt; DeltaTable

deltaTable &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; DeltaTable&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;forPath&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;spark&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;"/mnt/path/to/data_predictions"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
    deltaTable&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"t"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;merge&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
        df_upsert&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"s"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
        &lt;SPAN class="token token"&gt;"t.uuid = s.uuid"&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;whenMatchedUpdateAll&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;whenNotMatchedInsertAll&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;execute&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;/CODE&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Schedule this merge as a Databricks Job&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;every few minutes or hours (frequency depends on latency requirements).&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Delta will retain full historical records. Use&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;Delta Time Travel&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(&lt;CODE&gt;VERSION AS OF&lt;/CODE&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;TIMESTAMP AS OF&lt;/CODE&gt;), schema evolution, and keep the&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;data_predictions&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;table&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;as a true Delta Table&lt;/EM&gt;, not a view. Pipelines restarts will not destroy historical data.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Additional Tips&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Never use streaming joins or self-updating tables inside DLT for this CDC merge.&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;DLT wants a strictly acyclic DAG for tables, and streaming joins typically result in views—not Delta Tables—and lose history.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Consider watermarking and data retention&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;on the raw tables, if storage is a concern.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Late-arriving data is handled:&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;Because all upserts/merges are re-run on new Confirmer data, historical Predictor rows are updated as soon as late Confirmer data arrives.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;You can time travel historical states using Delta’s time travel features:&lt;/P&gt;
&lt;DIV class="w-full md:max-w-[90vw]"&gt;
&lt;DIV class="codeWrapper text-light selection:text-super selection:bg-super/10 my-md relative flex flex-col rounded-lg font-mono text-sm font-normal bg-subtler"&gt;
&lt;DIV class="translate-y-xs -translate-x-xs bottom-xl mb-xl flex h-0 items-start justify-end md:sticky md:top-[calc(var(--header-height)+var(--size-xs))]"&gt;
&lt;DIV class="overflow-hidden rounded-full border-subtlest ring-subtlest divide-subtlest bg-base"&gt;
&lt;DIV class="border-subtlest ring-subtlest divide-subtlest bg-subtler"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV class="-mt-xl"&gt;
&lt;DIV&gt;
&lt;DIV class="text-quiet bg-subtle py-xs px-sm inline-block rounded-br rounded-tl-lg text-xs font-thin" data-testid="code-language-indicator"&gt;sql&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;&lt;CODE&gt;&lt;SPAN class="token token"&gt;SELECT&lt;/SPAN&gt; &lt;SPAN class="token token operator"&gt;*&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;FROM&lt;/SPAN&gt; data_predictions VERSION &lt;SPAN class="token token"&gt;AS&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;OF&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;10&lt;/SPAN&gt;
&lt;/CODE&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;If you want the&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;data_predictions&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;to always reflect only the most recent state per uuid, keep only the latest row per uuid during the upsert.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Illustrated Workflow&lt;/H2&gt;
&lt;DIV class="group relative"&gt;
&lt;DIV class="w-full overflow-x-auto md:max-w-[90vw] border-subtlest ring-subtlest divide-subtlest bg-transparent"&gt;
&lt;TABLE class="border-subtler my-[1em] w-full table-auto border-separate border-spacing-0 border-l border-t"&gt;
&lt;THEAD class="bg-subtler"&gt;
&lt;TR&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Source&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;DLT Table&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Upsert Engine&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Gold Table (Delta Table)&lt;/TH&gt;
&lt;/TR&gt;
&lt;/THEAD&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Event Hub 1&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;raw_predictor (Delta)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Batch Notebook/Job&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;data_predictions (Delta Table)&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Event Hub 2&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;raw_confirmer (Delta)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;(merge on uuid)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;(all history retained by Delta)&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;/DIV&gt;
&lt;DIV class="bg-base border-subtler shadow-subtle pointer-coarse:opacity-100 right-xs absolute bottom-0 flex rounded-lg border opacity-0 transition-opacity group-hover:opacity-100 [&amp;amp;&amp;gt;*:not(:first-child)]:border-subtle [&amp;amp;&amp;gt;*:not(:first-child)]:border-l"&gt;
&lt;DIV class="flex"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="flex"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;&amp;nbsp;&lt;/H2&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;References&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;[Delta Live Tables Limitations &amp;amp; Best Practices]&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;[How to use CDC &amp;amp; Upserts with Delta Tables]&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Summary&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Ingest both streams to separate persistent Delta tables.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Run scheduled upsert jobs to consolidate into a single Delta Table (data_predictions) with history.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Avoid streaming joins and self-updates within DLT—use batch merge logic.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Leverage Delta Lake for historical queries and late data handling.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;This approach meets your requirements and works reliably for CDC and late-arriving updates in DLT.&lt;/P&gt;</description>
    <pubDate>Sun, 16 Nov 2025 17:24:25 GMT</pubDate>
    <dc:creator>mark_ott</dc:creator>
    <dc:date>2025-11-16T17:24:25Z</dc:date>
    <item>
      <title>Updating a Delta Table in Delta Live Tables (DLT) from Two Event Hubs</title>
      <link>https://community.databricks.com/t5/get-started-discussions/updating-a-delta-table-in-delta-live-tables-dlt-from-two-event/m-p/110518#M9048</link>
      <description>&lt;P class=""&gt;I am working with Databricks Delta Live Tables (DLT) and need to ingest data from two different Event Hubs. My goal is to:&lt;/P&gt;&lt;OL class=""&gt;&lt;LI&gt;Ingest initial data from the first Event Hub (Predictor) and store it in a Delta Table (data_predictions).&lt;/LI&gt;&lt;LI&gt;Later, update this table when related data arrives from the second Event Hub (Confirmer).&lt;/LI&gt;&lt;LI&gt;Each message from the second Event Hub contains a uuid, and if it matches an existing uuid in data_predictions, the corresponding record should be updated with the new fields from the Confirmer message&lt;/LI&gt;&lt;LI&gt;Ensure that data_predictions remains a Delta Table (not a Materialized View) and retains historical records even if the pipeline is restarted.&lt;/LI&gt;&lt;/OL&gt;&lt;P class=""&gt;Challenges I'm facing:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;DLT does not allow referring to the same table (data_predictions) multiple times within the same pipeline.&lt;/LI&gt;&lt;LI&gt;If I try to directly update data_predictions using apply_changes(), DLT breaks because it conflicts with the table creation.&lt;/LI&gt;&lt;LI&gt;If I perform a join between two streaming sources, Spark forces the output to be a Materialized View, which loses historical data upon pipeline restarts.&lt;/LI&gt;&lt;LI&gt;I need to process late-arriving updates: The Confirmer data may arrive days or weeks after the Predictor data.&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;What I need:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;A DLT pipeline that allows ingesting and persisting data from the Predictor Event Hub.&lt;/LI&gt;&lt;LI&gt;A way to update the same data_predictions table when data from the Confirmer Event Hub arrives.&lt;/LI&gt;&lt;LI&gt;A robust solution that prevents data loss and ensures updates work even if the Confirmer data is significantly delayed.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;My dlt code:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;.table(name="staging_data_predictor", temporary=True)
def table_data_predictor():
    df_raw = (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_PREDICTOR)
        .load()
    )

    df_parsed = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
        .select(from_json(col("json_data"), schema).alias("data")) \
        .select(
            col("data.uuid"),
            to_json(col("data.predictions")).alias("predictions"),
            col("data.invoice_data.*"),
        )

    df_predictor = df_parsed \
        .withColumn("timestamp", current_timestamp()) \
        .withColumn("code1", lit(None).cast(StringType())) \
        .withColumn("code2", lit(None).cast(StringType())) \
        .withColumn("timestamp_update", lit(None).cast(TimestampType()))

    return df_predictor

# Vista con i dati aggiornati da Event Hub Confirmer
@dlt.table(name="staging_data_confirmer", temporary=True)
def table_data_confirmer():
    df_raw = (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_CONFIRMER)
        .load()
    )

    df_confirmer = df_raw.selectExpr("CAST(value AS STRING) AS json_data") \
        .select(from_json(col("json_data"), schema_update).alias("data")) \
        .select("data.*") \
        .withColumn("timestamp_update", current_timestamp()) 

    return df_confirmer

@dlt.table(name="data_predictions")
def table_data_predictions():
    df_predictor = dlt.read("staging_data_predictor")
    df_confirmer = dlt.read("staging_data_confirmer")

    #Update dei record esistenti con dati da confirmer
    df_updated_records = df_predictor.alias("pred") \
        .join(df_confirmer.alias("conf"), "uuid", "left") \
        .select(
            col("pred.uuid"),
            col("pred.predictions"),
            coalesce(col("conf.code1"), col("pred.code1")).alias("code1"),
            coalesce(col("conf.code2"), col("pred.code2")).alias("code2"),
            when(
                col("conf.code1").isNotNull() | col("conf.code2").isNotNull(),
                current_timestamp()
            ).otherwise(col("pred.timestamp_update")).alias("timestamp_update")
        )

    return df_updated_records&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Right now, data_predictions is a Materialized View, meaning it only contains the latest data in the pipeline execution and loses historical records when the pipeline restarts.Updates from the Confirmer Event Hub only affect currently available records in the pipeline, but do not modify historical data that was ingested in previous executions.&lt;BR /&gt;&lt;BR /&gt;What is the best approach to achieve this in DLT without creating multiple persistent tables while ensuring data_predictions remains a Delta Table?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;Any suggestions or best practices would be greatly appreciated!&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 18 Feb 2025 17:12:24 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/updating-a-delta-table-in-delta-live-tables-dlt-from-two-event/m-p/110518#M9048</guid>
      <dc:creator>Radix95</dc:creator>
      <dc:date>2025-02-18T17:12:24Z</dc:date>
    </item>
    <item>
      <title>Re: Updating a Delta Table in Delta Live Tables (DLT) from Two Event Hubs</title>
      <link>https://community.databricks.com/t5/get-started-discussions/updating-a-delta-table-in-delta-live-tables-dlt-from-two-event/m-p/139234#M11029</link>
      <description>&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;To achieve robust, persistent CDC (Change Data Capture)–style updates in Databricks DLT with your scenario—&lt;STRONG&gt;while keeping data_predictions as a Delta Table (not a Materialized View)&lt;/STRONG&gt;—you need to carefully avoid streaming joins and side effects across streaming tables in a single DLT pipeline, because of DLT’s constraints on table references and state handling.&lt;/P&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Below is a best-practice architecture and pipeline pattern for your use case:&lt;/P&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Key Solution Principles&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Ingest Predictor and Confirmer streams each to their own staging Delta Tables&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(no materialized views or joins here).&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Make data_predictions a Delta Table&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;with historical CDC semantics (append-only with late-arriving data).&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Perform updates via&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;MERGE INTO&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(upserts) using a batch job or a separate DLT pipeline&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;instead of streaming/SQL views, as true updates on existing Delta Tables from multiple sources in a single DLT pipeline are not well supported.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Retain historical data&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;by&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;enabling Delta Lake time travel&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;and&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;mergeSchema&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;options, so even after pipeline restarts, your table’s history is intact and queryable.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Step-by-Step Pipeline Design&lt;/H2&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;1. Stage Each Stream in Its Own Raw Table&lt;/H2&gt;
&lt;DIV class="w-full md:max-w-[90vw]"&gt;
&lt;DIV class="codeWrapper text-light selection:text-super selection:bg-super/10 my-md relative flex flex-col rounded-lg font-mono text-sm font-normal bg-subtler"&gt;
&lt;DIV class="translate-y-xs -translate-x-xs bottom-xl mb-xl flex h-0 items-start justify-end md:sticky md:top-[calc(var(--header-height)+var(--size-xs))]"&gt;
&lt;DIV class="overflow-hidden rounded-full border-subtlest ring-subtlest divide-subtlest bg-base"&gt;
&lt;DIV class="border-subtlest ring-subtlest divide-subtlest bg-subtler"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV class="-mt-xl"&gt;
&lt;DIV&gt;
&lt;DIV class="text-quiet bg-subtle py-xs px-sm inline-block rounded-br rounded-tl-lg text-xs font-thin" data-testid="code-language-indicator"&gt;python&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;&lt;CODE&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;&lt;/SPAN&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;table&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;name&lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"raw_predictor"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token"&gt;def&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;raw_predictor&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;:&lt;/SPAN&gt;
    df &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; &lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
        spark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;readStream
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;format&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"kafka"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;options&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token operator"&gt;**&lt;/SPAN&gt;KAFKA_OPTIONS_PREDICTOR&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;load&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;selectExpr&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"CAST(value AS STRING) as json_data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;select&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;from_json&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;col&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"json_data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; schema&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token"&gt;# flatten, select, etc...&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"etl_ingest_ts"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; current_timestamp&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token"&gt;return&lt;/SPAN&gt; df

&lt;SPAN class="token token decorator annotation punctuation"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;&lt;/SPAN&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token decorator annotation punctuation"&gt;table&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;name&lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"raw_confirmer"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token"&gt;def&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;raw_confirmer&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;:&lt;/SPAN&gt;
    df &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; &lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
        spark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;readStream
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;format&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"kafka"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;options&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token operator"&gt;**&lt;/SPAN&gt;KAFKA_OPTIONS_CONFIRMER&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;load&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;selectExpr&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"CAST(value AS STRING) as json_data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;select&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;from_json&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;col&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"json_data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; schema_update&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"data"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
        &lt;SPAN class="token token"&gt;# flatten, select, etc...&lt;/SPAN&gt;
        &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"confirmer_etl_ts"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; current_timestamp&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token"&gt;return&lt;/SPAN&gt; df
&lt;/CODE&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;BLOCKQUOTE&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;These are&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;persisted, append-only Delta tables&lt;/STRONG&gt;—not materialized views.&lt;/P&gt;
&lt;/BLOCKQUOTE&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;2. Build/Persist the Gold Table (&lt;CODE&gt;data_predictions&lt;/CODE&gt;) via a Batch Upsert&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Create a separate batch pipeline or notebook&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;that periodically runs a&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;MERGE INTO&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;operation to keep&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;data_predictions&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;current with the latest from both staging tables, using a&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;uuid&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;key.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Example batch upsert job:&lt;/P&gt;
&lt;DIV class="w-full md:max-w-[90vw]"&gt;
&lt;DIV class="codeWrapper text-light selection:text-super selection:bg-super/10 my-md relative flex flex-col rounded-lg font-mono text-sm font-normal bg-subtler"&gt;
&lt;DIV class="translate-y-xs -translate-x-xs bottom-xl mb-xl flex h-0 items-start justify-end md:sticky md:top-[calc(var(--header-height)+var(--size-xs))]"&gt;
&lt;DIV class="overflow-hidden rounded-full border-subtlest ring-subtlest divide-subtlest bg-base"&gt;
&lt;DIV class="border-subtlest ring-subtlest divide-subtlest bg-subtler"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV class="-mt-xl"&gt;
&lt;DIV&gt;
&lt;DIV class="text-quiet bg-subtle py-xs px-sm inline-block rounded-br rounded-tl-lg text-xs font-thin" data-testid="code-language-indicator"&gt;python&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;&lt;CODE&gt;&lt;SPAN class="token token"&gt;# in a notebook or DLT batch pipeline cell&lt;/SPAN&gt;

&lt;SPAN class="token token"&gt;# Load tables&lt;/SPAN&gt;
df_predictor &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; spark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;read&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;table&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"raw_predictor"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
df_confirmer &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; spark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;read&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;table&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"raw_confirmer"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;

&lt;SPAN class="token token"&gt;# Prepare the upsert source by joining the latest confirmer data&lt;/SPAN&gt;
&lt;SPAN class="token token"&gt;from&lt;/SPAN&gt; pyspark&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;sql &lt;SPAN class="token token"&gt;import&lt;/SPAN&gt; functions &lt;SPAN class="token token"&gt;as&lt;/SPAN&gt; F

df_upsert &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; df_predictor&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;join&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
    df_confirmer&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    on&lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"uuid"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    how&lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"left"&lt;/SPAN&gt;
&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;select&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
    &lt;SPAN class="token token"&gt;"uuid"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    &lt;SPAN class="token token"&gt;"predictions"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    F&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;coalesce&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code1"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; df_predictor&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code1"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code1"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    F&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;coalesce&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code2"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; df_predictor&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code2"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code2"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    df_predictor&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"etl_ingest_ts"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
    F&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;when&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code1"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;isNotNull&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt; &lt;SPAN class="token token operator"&gt;|&lt;/SPAN&gt; df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"code2"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;isNotNull&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; df_confirmer&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"confirmer_etl_ts"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;otherwise&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df_predictor&lt;SPAN class="token token punctuation"&gt;[&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"timestamp_update"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;]&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"timestamp_update"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;

&lt;SPAN class="token token"&gt;# Merge (upsert) into persistent Delta Table&lt;/SPAN&gt;
&lt;SPAN class="token token"&gt;from&lt;/SPAN&gt; delta&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;tables &lt;SPAN class="token token"&gt;import&lt;/SPAN&gt; DeltaTable

deltaTable &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; DeltaTable&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;forPath&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;spark&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;"/mnt/path/to/data_predictions"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
    deltaTable&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"t"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;merge&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;
        df_upsert&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;alias&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"s"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt;
        &lt;SPAN class="token token"&gt;"t.uuid = s.uuid"&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;whenMatchedUpdateAll&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;whenNotMatchedInsertAll&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
    &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;execute&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;/CODE&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Schedule this merge as a Databricks Job&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;every few minutes or hours (frequency depends on latency requirements).&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Delta will retain full historical records. Use&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;Delta Time Travel&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(&lt;CODE&gt;VERSION AS OF&lt;/CODE&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;TIMESTAMP AS OF&lt;/CODE&gt;), schema evolution, and keep the&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;data_predictions&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;table&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;as a true Delta Table&lt;/EM&gt;, not a view. Pipelines restarts will not destroy historical data.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Additional Tips&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Never use streaming joins or self-updating tables inside DLT for this CDC merge.&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;DLT wants a strictly acyclic DAG for tables, and streaming joins typically result in views—not Delta Tables—and lose history.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Consider watermarking and data retention&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;on the raw tables, if storage is a concern.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Late-arriving data is handled:&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;Because all upserts/merges are re-run on new Confirmer data, historical Predictor rows are updated as soon as late Confirmer data arrives.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;You can time travel historical states using Delta’s time travel features:&lt;/P&gt;
&lt;DIV class="w-full md:max-w-[90vw]"&gt;
&lt;DIV class="codeWrapper text-light selection:text-super selection:bg-super/10 my-md relative flex flex-col rounded-lg font-mono text-sm font-normal bg-subtler"&gt;
&lt;DIV class="translate-y-xs -translate-x-xs bottom-xl mb-xl flex h-0 items-start justify-end md:sticky md:top-[calc(var(--header-height)+var(--size-xs))]"&gt;
&lt;DIV class="overflow-hidden rounded-full border-subtlest ring-subtlest divide-subtlest bg-base"&gt;
&lt;DIV class="border-subtlest ring-subtlest divide-subtlest bg-subtler"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV class="-mt-xl"&gt;
&lt;DIV&gt;
&lt;DIV class="text-quiet bg-subtle py-xs px-sm inline-block rounded-br rounded-tl-lg text-xs font-thin" data-testid="code-language-indicator"&gt;sql&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;&lt;CODE&gt;&lt;SPAN class="token token"&gt;SELECT&lt;/SPAN&gt; &lt;SPAN class="token token operator"&gt;*&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;FROM&lt;/SPAN&gt; data_predictions VERSION &lt;SPAN class="token token"&gt;AS&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;OF&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;10&lt;/SPAN&gt;
&lt;/CODE&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;If you want the&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;data_predictions&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;to always reflect only the most recent state per uuid, keep only the latest row per uuid during the upsert.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Illustrated Workflow&lt;/H2&gt;
&lt;DIV class="group relative"&gt;
&lt;DIV class="w-full overflow-x-auto md:max-w-[90vw] border-subtlest ring-subtlest divide-subtlest bg-transparent"&gt;
&lt;TABLE class="border-subtler my-[1em] w-full table-auto border-separate border-spacing-0 border-l border-t"&gt;
&lt;THEAD class="bg-subtler"&gt;
&lt;TR&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Source&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;DLT Table&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Upsert Engine&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Gold Table (Delta Table)&lt;/TH&gt;
&lt;/TR&gt;
&lt;/THEAD&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Event Hub 1&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;raw_predictor (Delta)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Batch Notebook/Job&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;data_predictions (Delta Table)&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Event Hub 2&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;raw_confirmer (Delta)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;(merge on uuid)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;(all history retained by Delta)&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;/DIV&gt;
&lt;DIV class="bg-base border-subtler shadow-subtle pointer-coarse:opacity-100 right-xs absolute bottom-0 flex rounded-lg border opacity-0 transition-opacity group-hover:opacity-100 [&amp;amp;&amp;gt;*:not(:first-child)]:border-subtle [&amp;amp;&amp;gt;*:not(:first-child)]:border-l"&gt;
&lt;DIV class="flex"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="flex"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;&amp;nbsp;&lt;/H2&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;References&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;[Delta Live Tables Limitations &amp;amp; Best Practices]&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;[How to use CDC &amp;amp; Upserts with Delta Tables]&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;HR /&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Summary&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Ingest both streams to separate persistent Delta tables.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Run scheduled upsert jobs to consolidate into a single Delta Table (data_predictions) with history.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Avoid streaming joins and self-updates within DLT—use batch merge logic.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Leverage Delta Lake for historical queries and late data handling.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;This approach meets your requirements and works reliably for CDC and late-arriving updates in DLT.&lt;/P&gt;</description>
      <pubDate>Sun, 16 Nov 2025 17:24:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/updating-a-delta-table-in-delta-live-tables-dlt-from-two-event/m-p/139234#M11029</guid>
      <dc:creator>mark_ott</dc:creator>
      <dc:date>2025-11-16T17:24:25Z</dc:date>
    </item>
  </channel>
</rss>

