<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Designing Reliable Stream–Stream Joins with Watermarks in Databricks in Community Articles</title>
    <link>https://community.databricks.com/t5/community-articles/designing-reliable-stream-stream-joins-with-watermarks-in/m-p/142681#M908</link>
    <description>&lt;P&gt;Interesting and very Insightful&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/48750"&gt;@Divaker_Soni&lt;/a&gt;.&lt;span class="lia-unicode-emoji" title=":grinning_face:"&gt;😀&lt;/span&gt;&lt;/P&gt;</description>
    <pubDate>Tue, 30 Dec 2025 10:11:18 GMT</pubDate>
    <dc:creator>TejeshS</dc:creator>
    <dc:date>2025-12-30T10:11:18Z</dc:date>
    <item>
      <title>Designing Reliable Stream–Stream Joins with Watermarks in Databricks</title>
      <link>https://community.databricks.com/t5/community-articles/designing-reliable-stream-stream-joins-with-watermarks-in/m-p/142620#M902</link>
      <description>&lt;P class=""&gt;Stream–stream joins are one of the most powerful features in Databricks Structured Streaming – and also one of the easiest to misconfigure. As soon as you move from simple append-only pipelines to real-time correlations across multiple streams (orders vs payments, clicks vs impressions, IoT readings vs alerts), you run into questions like:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;How do you prevent unbounded state growth in memory?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;How do you control how long Spark should wait for late events?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;What happens when the business wants a left join, but the platform complains about missing watermarks?&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;This post walks through how to design robust stream–stream joins with watermarks in Databricks, focusing on:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Why watermarks are effectively mandatory for non-trivial stream–stream joins.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;How watermarks interact with join types and time-range conditions.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;What the common “append mode + outer join” error is really telling you.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Practical design patterns that work in production (stream–stream, stream–static, and late-event handling).&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;HR /&gt;&lt;H2 id="1-why-streamstream-joins-need-watermarks"&gt;1. Why Stream–Stream Joins Need Watermarks&lt;/H2&gt;&lt;P class=""&gt;A stream–stream join keeps state: Spark must hold past records from both sides until it is confident there is no more matching data for them. Without a bound, that state can grow without limit.&lt;/P&gt;&lt;P class=""&gt;Watermarks give Spark a time fence on event-time columns. Conceptually, a watermark on column&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;event_time&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;with&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;"10 minutes"&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;says:&lt;/P&gt;&lt;BLOCKQUOTE&gt;&lt;P class=""&gt;“I will not get data older than max(event_time_seen) – 10 minutes.”&lt;/P&gt;&lt;/BLOCKQUOTE&gt;&lt;P class=""&gt;This allows Spark to safely drop old state from the join buffer once rows fall behind the watermark (plus any join window you define).&lt;/P&gt;&lt;P class=""&gt;Example (Scala):&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;DIV&gt;&lt;DIV class=""&gt;scala&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;val&lt;/SPAN&gt; orders &lt;SPAN class=""&gt;=&lt;/SPAN&gt; spark&lt;SPAN class=""&gt;.&lt;/SPAN&gt;readStream &lt;SPAN class=""&gt;.&lt;/SPAN&gt;format&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;option&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"json"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;load&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"/raw/orders"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; col&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;cast&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withWatermark&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"10 minutes"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;val&lt;/SPAN&gt; payments &lt;SPAN class=""&gt;=&lt;/SPAN&gt; spark&lt;SPAN class=""&gt;.&lt;/SPAN&gt;readStream &lt;SPAN class=""&gt;.&lt;/SPAN&gt;format&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;option&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"json"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;load&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"/raw/payments"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; col&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;cast&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withWatermark&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"20 minutes"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P class=""&gt;Here Spark:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Tracks the maximum&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;event_time&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;seen for each stream.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Periodically evicts join state that is older than the corresponding watermark plus the configured join window.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;Without this contract, Spark has no guarantee that data will not arrive infinitely late, so it cannot safely clear state for a stream–stream join.&lt;/P&gt;&lt;HR /&gt;&lt;H2 id="2-understanding-the-common-outer-join-error"&gt;2. Understanding the Common Outer-Join Error&lt;/H2&gt;&lt;P class=""&gt;Many people eventually hit an error along the lines of:&lt;/P&gt;&lt;BLOCKQUOTE&gt;&lt;P class=""&gt;Append mode is not supported for stream-stream left outer join between two streaming DataFrames/Datasets without watermark in the join keys, or watermark on the nullable side and an appropriate range condition.&lt;/P&gt;&lt;/BLOCKQUOTE&gt;&lt;P class=""&gt;Translated into practical terms:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;You are doing a left (or right/full) stream–stream join in append mode.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Spark does not see a watermark on the time column involved in the join condition, or there is no time-range condition that uses that column.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;For outer joins in append mode, Spark must know when a row on the preserved side is “final” – meaning no more matches can arrive. “Final” is defined using:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;A watermark on the nullable side (the side that can produce nulls after the join).&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;A time-range condition in the join predicate that constrains how late a matching row may arrive.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;If either of these is missing, Spark cannot safely decide when to emit a final result and drop state, so it rejects the query in append mode.&lt;/P&gt;&lt;HR /&gt;&lt;H2 id="3-building-a-correct-streamstream-join"&gt;3. Building a Correct Stream–Stream Join&lt;/H2&gt;&lt;P class=""&gt;Consider a classic pattern:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;orders_stream: main business events (purchase orders).&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;shipments_stream: related events (shipment confirmations, tracking updates).&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;You want a joined stream of&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(order, shipment)&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;records with:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;All orders.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Matching shipments that arrive within a reasonable time window.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;A clear strategy for shipments that arrive too late.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;3.1 Inner join with watermarks&lt;/H2&gt;&lt;P class=""&gt;Start with a time-bounded inner join where both sides are streaming:&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;DIV&gt;&lt;DIV class=""&gt;scala&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;import&lt;/SPAN&gt; &lt;SPAN class=""&gt;org&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;apache&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;spark&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;sql&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;functions&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;_ &lt;SPAN class=""&gt;val&lt;/SPAN&gt; orders &lt;SPAN class=""&gt;=&lt;/SPAN&gt; spark&lt;SPAN class=""&gt;.&lt;/SPAN&gt;readStream &lt;SPAN class=""&gt;.&lt;/SPAN&gt;format&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;option&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"json"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;load&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"/raw/orders"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; col&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;cast&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withWatermark&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"10 minutes"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;val&lt;/SPAN&gt; shipments &lt;SPAN class=""&gt;=&lt;/SPAN&gt; spark&lt;SPAN class=""&gt;.&lt;/SPAN&gt;readStream &lt;SPAN class=""&gt;.&lt;/SPAN&gt;format&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;option&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"json"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;load&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"/raw/shipments"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; col&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;cast&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withWatermark&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"20 minutes"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;val&lt;/SPAN&gt; joined &lt;SPAN class=""&gt;=&lt;/SPAN&gt; orders&lt;SPAN class=""&gt;.&lt;/SPAN&gt;join&lt;SPAN class=""&gt;(&lt;/SPAN&gt; shipments&lt;SPAN class=""&gt;,&lt;/SPAN&gt; expr&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;""" &lt;/SPAN&gt;&lt;SPAN class=""&gt;orders.order_id = shipments.order_id AND &lt;/SPAN&gt;&lt;SPAN class=""&gt;shipments.event_time BETWEEN orders.event_time AND orders.event_time + INTERVAL 10 MINUTES &lt;/SPAN&gt;&lt;SPAN class=""&gt;"""&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; joinType &lt;SPAN class=""&gt;=&lt;/SPAN&gt; &lt;SPAN class=""&gt;"inner"&lt;/SPAN&gt; &lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P class=""&gt;Key points:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Both sides have watermarks (10 and 20 minutes).&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;The join condition includes a time-range predicate on the event-time column.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;This defines a finite window where matches are allowed:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Shipments may arrive after orders, but only within 10 minutes of&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;orders.event_time.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Once orders are older than the effective watermark for shipments minus the allowed window, Spark can safely evict them from state.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;3.2 Left outer join with watermark on the nullable side&lt;/H2&gt;&lt;P class=""&gt;Now imagine the requirement changes:&lt;/P&gt;&lt;BLOCKQUOTE&gt;&lt;P class=""&gt;“Keep all orders even if there is no shipment (yet).”&lt;/P&gt;&lt;/BLOCKQUOTE&gt;&lt;P class=""&gt;This requires a left outer join. Spark only supports this in append mode if:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;The nullable side (here,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;shipments) has a watermark on an event-time column.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;The join condition includes a corresponding time-range predicate.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;Example:&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;DIV&gt;&lt;DIV class=""&gt;scala&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;val&lt;/SPAN&gt; joinedLeft &lt;SPAN class=""&gt;=&lt;/SPAN&gt; orders&lt;SPAN class=""&gt;.&lt;/SPAN&gt;join&lt;SPAN class=""&gt;(&lt;/SPAN&gt; shipments&lt;SPAN class=""&gt;,&lt;/SPAN&gt; expr&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;""" &lt;/SPAN&gt;&lt;SPAN class=""&gt;orders.order_id = shipments.order_id AND &lt;/SPAN&gt;&lt;SPAN class=""&gt;shipments.event_time BETWEEN orders.event_time AND orders.event_time + INTERVAL 10 MINUTES &lt;/SPAN&gt;&lt;SPAN class=""&gt;"""&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; joinType &lt;SPAN class=""&gt;=&lt;/SPAN&gt; &lt;SPAN class=""&gt;"leftOuter"&lt;/SPAN&gt; &lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P class=""&gt;Logically:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Spark holds orders in state and waits for shipments within the configured window.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;After the watermark window for&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;shipments.event_time&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;has passed, Spark considers that order “final” and emits the row (with&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;null&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;shipment columns if no match arrived).&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;State for that order is then dropped.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;If the watermark or the time-range condition is missing, you get the outer-join append-mode error described above.&lt;/P&gt;&lt;HR /&gt;&lt;H2 id="4-when-you-need-full-history-on-one-side"&gt;4. When You Need “Full History” on One Side&lt;/H2&gt;&lt;P class=""&gt;A very common real-world constraint is:&lt;/P&gt;&lt;BLOCKQUOTE&gt;&lt;P class=""&gt;“I cannot apply a watermark on the shipments stream; I need full historical shipment data for lookups.”&lt;/P&gt;&lt;/BLOCKQUOTE&gt;&lt;P class=""&gt;In other words, you want to enrich a streaming&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;orders&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;flow with a full, ever-growing reference dataset (for example, a catalog of shipment codes or carrier details). With two truly streaming sides, you cannot have “infinite history, no watermark, and left outer join in append mode” at the same time; state must be bounded.&lt;/P&gt;&lt;P class=""&gt;Patterns that work in practice:&lt;/P&gt;&lt;H2&gt;4.1 Convert the reference side to a static table (stream–static join)&lt;/H2&gt;&lt;P class=""&gt;If one side can be materialized:&lt;/P&gt;&lt;OL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Ingest/update the reference stream into a Delta table (possibly with merge / SCD logic).&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Use that table as a static side in a streaming job:&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;DIV&gt;&lt;DIV class=""&gt;scala&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;val&lt;/SPAN&gt; shipmentCatalog &lt;SPAN class=""&gt;=&lt;/SPAN&gt; spark&lt;SPAN class=""&gt;.&lt;/SPAN&gt;read&lt;SPAN class=""&gt;.&lt;/SPAN&gt;table&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"reference.shipment_codes"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;val&lt;/SPAN&gt; orders &lt;SPAN class=""&gt;=&lt;/SPAN&gt; spark&lt;SPAN class=""&gt;.&lt;/SPAN&gt;readStream &lt;SPAN class=""&gt;.&lt;/SPAN&gt;format&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;option&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"json"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;load&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"/raw/orders"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withColumn&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; col&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;cast&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;.&lt;/SPAN&gt;withWatermark&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"event_time"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"10 minutes"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;SPAN class=""&gt;val&lt;/SPAN&gt; joined &lt;SPAN class=""&gt;=&lt;/SPAN&gt; orders&lt;SPAN class=""&gt;.&lt;/SPAN&gt;join&lt;SPAN class=""&gt;(&lt;/SPAN&gt; shipmentCatalog&lt;SPAN class=""&gt;,&lt;/SPAN&gt; Seq&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"shipment_code"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"leftOuter"&lt;/SPAN&gt; &lt;SPAN class=""&gt;// stream-static left join&lt;/SPAN&gt; &lt;SPAN class=""&gt;)&lt;/SPAN&gt; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P class=""&gt;Benefits:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;No watermark needed on the static side.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;No time-range requirement in the join condition.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;State growth is bounded by the streaming side only.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;This is usually the best choice when you truly need the full history from one side.&lt;/P&gt;&lt;H2&gt;4.2 Two-stage architecture for “full history + streaming.”&lt;/H2&gt;&lt;P class=""&gt;If the reference source is naturally streaming, but you still need its full history:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;&lt;STRONG&gt;Stage 1:&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;Ingest the reference stream into a Delta table via a simple streaming or DLT pipeline.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;&lt;STRONG&gt;Stage 2:&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;Use that Delta table as the static side in a stream–static join with the main streaming flow (orders).&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;This separates “stateful ingestion” from “stateful joining,” which typically simplifies operations, monitoring, and recovery.&lt;/P&gt;&lt;HR /&gt;&lt;H2 id="5-handling-late-arrivals-side-outputs-and-reproces"&gt;5. Handling Late Arrivals: Side Outputs and Reprocessing&lt;/H2&gt;&lt;P class=""&gt;Even with carefully chosen watermarks, late events are inevitable (replays, network delays, upstream backlog, etc.). Production-grade designs rarely just drop them silently.&lt;/P&gt;&lt;P class=""&gt;A robust strategy:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Define a main join window where “on-time” events land in the primary output.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Route late events to a dedicated “late_events” table for offline reconciliation.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;Downstream, a batch job can:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Try to reconcile late events with existing orders.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Fix the main table via updates/merges.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Mark late records as processed and, optionally, generate alerts if late volume spikes.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;One common pattern is:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Main stream: events that satisfy the join’s time-window predicate.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Late stream: events that do not satisfy it or arrive after the watermark, often modeled with filters or left-anti joins.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;In DLT or PySpark, pipelines often add flags such as&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;valid_flag,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;late_flag, and&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;reason_code, plus a dedicated&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;late_events&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;table to make downstream operations explicit.&lt;/P&gt;&lt;HR /&gt;&lt;H2 id="6-practical-checklist-for-streamstream-joins"&gt;6. Practical Checklist for Stream–Stream Joins&lt;/H2&gt;&lt;P class=""&gt;When designing your next stream–stream join in Databricks, walk through this checklist:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Do both sides really need to be streaming?&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;If one side can be materialized as a Delta table, prefer a stream–static join.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Which join type do you actually need?&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Inner joins are simpler and more forgiving.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Outer joins in append mode require watermarks and time-range predicates.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;What is your event-time column?&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Ensure it is a true timestamp and normalize time zones early (UTC is safest).&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Where are the watermarks?&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;At least one side for inner joins.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;On the nullable side for outer joins, plus a time-range condition.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;What is the acceptable late-arrival window?&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Express it explicitly in the watermark definition and in the join condition.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Align it with business SLAs (for example, “shipments may arrive up to 15 minutes late”).&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;What is the plan for late events?&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Separate table for late records.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Reconciliation/merge process.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Alerting when late volume exceeds a threshold.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;How will you monitor the state and performance?&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;Use the Structured Streaming UI and metrics.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Track state-store memory, processing latency, and dropped-late counts.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Adjust watermarks and windows using real traffic patterns.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;HR /&gt;&lt;H2 id="7-closing-thoughts"&gt;7. Closing Thoughts&lt;/H2&gt;&lt;P class=""&gt;Reliable stream–stream joins in Databricks are less about clever code and more about clear contracts around time:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;How out-of-order is your data allowed to be?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;How late can events arrive and still be considered valid?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;When is a join result considered final?&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;Watermarks and time-range conditions encode these contracts into the platform so that:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;State remains bounded and predictable.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Join outputs are logically consistent.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Late data is handled deliberately instead of being lost by accident.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;If you already have a concrete scenario (for example, orders and shipments with late arrivals and reconciliation), you can extend this post with:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;A full code walkthrough from raw ingestion to curated DLT tables.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Diagrams of “before vs after” designs.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;Metrics snapshots to show the impact of different watermark and window choices on state size and latency.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;That level of specificity tends to resonate well with Databricks Community readers who are searching for solutions to real production issues.&lt;/P&gt;</description>
      <pubDate>Mon, 29 Dec 2025 09:22:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/community-articles/designing-reliable-stream-stream-joins-with-watermarks-in/m-p/142620#M902</guid>
      <dc:creator>Divaker_Soni</dc:creator>
      <dc:date>2025-12-29T09:22:44Z</dc:date>
    </item>
    <item>
      <title>Re: Designing Reliable Stream–Stream Joins with Watermarks in Databricks</title>
      <link>https://community.databricks.com/t5/community-articles/designing-reliable-stream-stream-joins-with-watermarks-in/m-p/142681#M908</link>
      <description>&lt;P&gt;Interesting and very Insightful&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/48750"&gt;@Divaker_Soni&lt;/a&gt;.&lt;span class="lia-unicode-emoji" title=":grinning_face:"&gt;😀&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 30 Dec 2025 10:11:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/community-articles/designing-reliable-stream-stream-joins-with-watermarks-in/m-p/142681#M908</guid>
      <dc:creator>TejeshS</dc:creator>
      <dc:date>2025-12-30T10:11:18Z</dc:date>
    </item>
  </channel>
</rss>

