<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: delta as streaming source, can the reader reads only newly appended rows? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138796#M51015</link>
    <description>&lt;P&gt;Thanks&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/193092"&gt;@bianca_unifeye&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;As the trigger is AvailableNow, each trigger starts with a new spark session. Then, When setting the following, "&lt;SPAN&gt;By default, the stream returns the latest snapshot of the table when the stream first starts as an&amp;nbsp;&lt;/SPAN&gt;INSERT&lt;SPAN&gt;&amp;nbsp;and future changes as change data".&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This is to say, enable CDF or not, will not make a difference in this case.&lt;/P&gt;&lt;P&gt;To get only last changes, I had to set&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;.option("startingVersion", "latest")&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;But, the joining of multiple streaming tables will be problematic.&lt;BR /&gt;&lt;BR /&gt;I will explore how AUTO CDC works, and see if it is a better solution than streaming from a Delta table.&lt;BR /&gt;&lt;STRONG&gt;Databricks says&lt;/STRONG&gt;:&amp;nbsp;&lt;SPAN&gt;Databricks recommends streaming from the CDC feed of a Delta table (option 1) rather than the Delta table itself (option 2) whenever possible.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;Any comments and advices are appreciated.&lt;BR /&gt;Na.&lt;/P&gt;</description>
    <pubDate>Wed, 12 Nov 2025 14:55:49 GMT</pubDate>
    <dc:creator>cdn_yyz_yul</dc:creator>
    <dc:date>2025-11-12T14:55:49Z</dc:date>
    <item>
      <title>delta as streaming source, can the reader reads only newly appended rows?</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138474#M50935</link>
      <description>&lt;P&gt;Hello everyone,&lt;/P&gt;&lt;P&gt;In our implementation of Medallion Architecture, we want to stream changes with spark structured streaming. I would like some advice on how to use delta table as source correctly, and if there is performance (memory usage) concern in the long run.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Summary of the scenario:&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Source: Delta table, append-only&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# to read from source
df =  spark.readStream.format("delta").table("table_name")&lt;/LI-CODE&gt;&lt;P&gt;Sink: Delta table&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# to write to sink
df.writeStream.format("delta").outputMode("append").option(
	    "checkpointLocation", "location"))
).trigger(availableNow=True).table("target_table_name")&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Steps used during testing&lt;/STRONG&gt;:&lt;/P&gt;&lt;P&gt;(the final implementation has code does the same sequence of operations)&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Read three delta tables as df1, df2, df3.&lt;/LI&gt;&lt;LI&gt;inner join them without setting watermark and window constraints.&lt;/LI&gt;&lt;LI&gt;Write to target delta table.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Observed that the df1-df3&amp;nbsp; contains all the rows of respective delta tables after each trigger.&amp;nbsp;&lt;/LI&gt;&lt;/OL&gt;&lt;P class="lia-indent-padding-left-30px"&gt;For example, in a clean environment where the source table contains x1, x2, x3 number of rows, do initial run, the three dfs contains x1, x2, x3 rows respectively. Continue and finish the join and write to target.&lt;/P&gt;&lt;P class="lia-indent-padding-left-30px"&gt;2. Appending new rows to source delta, trigger the stream read, observed df1-df3, each contains the rows of the entire source table, i.e., x + new, continue and finish.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;It is true that the streamWriter wirtes only updates to the sink. But,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;will the join take more and more memory and process more and more rows as the data grows&lt;/STRONG&gt;?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Databricks doc says&lt;/STRONG&gt;:&lt;/P&gt;&lt;P&gt;You can use the following options to specify the starting point of the Delta Lake streaming source without processing the entire table.&lt;/P&gt;&lt;P&gt;&lt;U&gt;startingVersion:&lt;/U&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;The Delta Lake version to start from. Databricks recommends omitting this option for most workloads. When not set, the stream starts from the latest available version including a complete snapshot of the table at that moment and future changes as change data.&lt;/P&gt;&lt;P&gt;&lt;A href="https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/delta-lake#specify-initial-position" target="_blank" rel="nofollow noopener noreferrer"&gt;https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/delta-lake#specify-initial-p...&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;The initial run:
   "stateOperators": [
        {
            "operatorName": "symmetricHashJoin",
            "numRowsTotal": 48847,
            "numRowsUpdated": 48847,
            "allUpdatesTimeMs": 324994,
            "numRowsRemoved": 0,
            "allRemovalsTimeMs": 5,
            "commitTimeMs": 50117,
            "memoryUsedBytes": 4246347944,
            "numRowsDroppedByWatermark": 0,
            "numShufflePartitions": 200,
            "numStateStoreInstances": 800,
            "customMetrics":...},

        {
            "operatorName": "symmetricHashJoin",
            "numRowsTotal": 48858,
            "numRowsUpdated": 48858,
            "allUpdatesTimeMs": 325977,
            "numRowsRemoved": 0,
            "allRemovalsTimeMs": 0,
            "commitTimeMs": 54268,
            "memoryUsedBytes": 1048889302,
            "numRowsDroppedByWatermark": 0,
            "numShufflePartitions": 200,
            "numStateStoreInstances": 800,
            "customMetrics":.....&lt;/LI-CODE&gt;&lt;LI-CODE lang="javascript"&gt;During the second run:

    "stateOperators": [
        {
            "operatorName": "symmetricHashJoin",
            "numRowsTotal": 61057,
            "numRowsUpdated": 12210,
            "allUpdatesTimeMs": 352948,
            "numRowsRemoved": 0,
            "allRemovalsTimeMs": 5,
            "commitTimeMs": 63948,
            "memoryUsedBytes": 7248908645,
            "numRowsDroppedByWatermark": 0,
            "numShufflePartitions": 200,
            "numStateStoreInstances": 800,
            "customMetrics": {...}
        },
        {
            "operatorName": "symmetricHashJoin",
            "numRowsTotal": 61073,
            "numRowsUpdated": 12215,
            "allUpdatesTimeMs": 413147,
            "numRowsRemoved": 0,
            "allRemovalsTimeMs": 7,
            "commitTimeMs": 73142,
            "memoryUsedBytes": 8876238984,
            "numRowsDroppedByWatermark": 0,
            "numShufflePartitions": 200,
            "numStateStoreInstances": 800,......&lt;/LI-CODE&gt;&lt;P&gt;Your advice is appreciated.&lt;BR /&gt;Na.&lt;/P&gt;</description>
      <pubDate>Mon, 10 Nov 2025 20:53:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138474#M50935</guid>
      <dc:creator>cdn_yyz_yul</dc:creator>
      <dc:date>2025-11-10T20:53:21Z</dc:date>
    </item>
    <item>
      <title>Re: delta as streaming source, can the reader reads only newly appended rows?</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138623#M50979</link>
      <description>&lt;P&gt;&lt;STRONG&gt;First of all&lt;/STRONG&gt;, you are using &lt;STRONG&gt;append-only reads&lt;/STRONG&gt;, which means that every time your stream triggers, Spark will process &lt;STRONG&gt;the entire Delta snapshot&lt;/STRONG&gt; rather than just the changes.&lt;BR /&gt;That’s why you’re observing the memory usage increase after each run, it’s not a bug, it’s how &lt;STRONG&gt;Spark Structured Streaming&lt;/STRONG&gt; works under the hood.&lt;/P&gt;&lt;H2&gt;Use instead Delta Change Data Feed (CDF)&lt;/H2&gt;&lt;P&gt;Change Data Feed (CDF) is a built-in Delta feature that lets you read &lt;STRONG&gt;only the changes (inserts, updates, deletes)&lt;/STRONG&gt; instead of the full dataset.&lt;BR /&gt;When you enable it, Spark treats the Delta table as a &lt;STRONG&gt;true incremental stream source.&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://docs.databricks.com/aws/en/delta/delta-change-data-feed" target="_blank" rel="noopener"&gt;https://docs.databricks.com/aws/en/delta/delta-change-data-feed&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 11 Nov 2025 16:44:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138623#M50979</guid>
      <dc:creator>bianca_unifeye</dc:creator>
      <dc:date>2025-11-11T16:44:16Z</dc:date>
    </item>
    <item>
      <title>Re: delta as streaming source, can the reader reads only newly appended rows?</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138796#M51015</link>
      <description>&lt;P&gt;Thanks&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/193092"&gt;@bianca_unifeye&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;As the trigger is AvailableNow, each trigger starts with a new spark session. Then, When setting the following, "&lt;SPAN&gt;By default, the stream returns the latest snapshot of the table when the stream first starts as an&amp;nbsp;&lt;/SPAN&gt;INSERT&lt;SPAN&gt;&amp;nbsp;and future changes as change data".&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This is to say, enable CDF or not, will not make a difference in this case.&lt;/P&gt;&lt;P&gt;To get only last changes, I had to set&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;.option("startingVersion", "latest")&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;But, the joining of multiple streaming tables will be problematic.&lt;BR /&gt;&lt;BR /&gt;I will explore how AUTO CDC works, and see if it is a better solution than streaming from a Delta table.&lt;BR /&gt;&lt;STRONG&gt;Databricks says&lt;/STRONG&gt;:&amp;nbsp;&lt;SPAN&gt;Databricks recommends streaming from the CDC feed of a Delta table (option 1) rather than the Delta table itself (option 2) whenever possible.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;Any comments and advices are appreciated.&lt;BR /&gt;Na.&lt;/P&gt;</description>
      <pubDate>Wed, 12 Nov 2025 14:55:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138796#M51015</guid>
      <dc:creator>cdn_yyz_yul</dc:creator>
      <dc:date>2025-11-12T14:55:49Z</dc:date>
    </item>
    <item>
      <title>Re: delta as streaming source, can the reader reads only newly appended rows?</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138813#M51020</link>
      <description>&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;In your scenario using Medallion Architecture with Delta tables as both streaming source and sink, it is important to understand Spark Structured Streaming behavior and performance characteristics, especially with joins and memory usage. Here is a direct, actionable analysis based on your detailed setup and observed metrics.&lt;/P&gt;
&lt;H2 id="how-delta-table-as-source-works-in-streaming" class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0 md:text-lg [hr+&amp;amp;]:mt-4"&gt;How Delta Table as Source Works in Streaming&lt;/H2&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;When you use a Delta table as a streaming source (&lt;CODE&gt;spark.readStream.format("delta").table("table_name")&lt;/CODE&gt;), Spark Structured Streaming tracks new data files appended to the Delta log. On initial start, unless you specify&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;"startingVersion"&lt;/CODE&gt;, Spark reads a full snapshot of the table (all rows). For every subsequent trigger, only new files/data are processed as micro-batch increments.&lt;/P&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;Key behavior:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;On the&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;first trigger&lt;/STRONG&gt;, the read covers the entire table.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;On&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;subsequent triggers&lt;/STRONG&gt;, it reads&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;only new appended files&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(new data) since the last checkpoint.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H2 id="why-joins-appear-to-process-all-rows-each-trigger" class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0 md:text-lg [hr+&amp;amp;]:mt-4"&gt;Why Joins Appear to Process All Rows Each Trigger&lt;/H2&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;What you observed:&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;On the initial run, df1-df3 have all current rows, π1, π2, π3. Join+write occur.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;After appending new rows, and triggering, you see df1-df3 now have π1+n, π2+m, π3+k rows, always including all past data.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;This happens because:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Each trigger’s streaming DataFrame by default represents all unprocessed data (since last checkpoint), but&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;should not&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;include all rows from the physical table every time, unless you configured "availableNow=true" -- in which case, you get a bounded (batch-like) execution and a full scan occurs.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;If you run with&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;.trigger(availableNow=True)&lt;/CODE&gt;, the micro-batch will process the full available data each time; this is intended for one-off refreshes, not continuous streaming.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;For normal&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;.trigger(processingTime='interval')&lt;/CODE&gt;, each trigger sees&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;just new data&lt;/EM&gt;, leading to much lower processing/memory.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H2 id="memory-growth-the-real-concern-with-joins" class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0 md:text-lg [hr+&amp;amp;]:mt-4"&gt;Memory Growth: The Real Concern with Joins&lt;/H2&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Structured Streaming joins (especially inner/outer) build in-memory state for joined keys. If you do&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;not set watermark&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;or retention duration, the in-memory state&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;grows unbounded&lt;/STRONG&gt;, because Spark assumes all historic data may still match a late-arriving record.&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;The state store (see "numRowsTotal", "memoryUsedBytes" in your metrics) accumulates old data.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Each symmetric hash join instance holds all seen join keys&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;until&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;it can safely evict data, which only happens with event time watermarks.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;Without Watermarks:&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Memory usage increases with each new batch, as old keys are never "timed out" or removed.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;This can easily lead to OOM (Out Of Memory) errors or excessive state store growth, especially in large tables.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H2 class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0"&gt;With Watermarks:&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Watermarks let Spark know “data older than this is safe to drop from memory,” so the state store remains bounded.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;For append-only tables and inner joins, set watermarks on the event-time column and ideally design joins so only a recent window of keys must be kept in memory.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H2 id="recommendations-for-your-scenario" class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0 md:text-lg [hr+&amp;amp;]:mt-4"&gt;Recommendations for Your Scenario&lt;/H2&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;1. Use streaming mode (not availableNow) for continuous ingest&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Only use&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;availableNow=True&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;for batch catch-up; for continuous, classic streaming mode is more memory-efficient.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;2. Always set watermarks in streaming joins&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Example:&lt;/P&gt;
&lt;DIV class="w-full md:max-w-[90vw]"&gt;
&lt;DIV class="codeWrapper text-light selection:text-super selection:bg-super/10 my-md relative flex flex-col rounded font-mono text-sm font-normal bg-subtler"&gt;
&lt;DIV class="translate-y-xs -translate-x-xs bottom-xl mb-xl flex h-0 items-start justify-end md:sticky md:top-[100px]"&gt;
&lt;DIV class="overflow-hidden rounded-full border-subtlest ring-subtlest divide-subtlest bg-base"&gt;
&lt;DIV class="border-subtlest ring-subtlest divide-subtlest bg-subtler"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV class="-mt-xl"&gt;
&lt;DIV&gt;
&lt;DIV class="text-quiet bg-subtle py-xs px-sm inline-block rounded-br rounded-tl-[3px] font-thin" data-testid="code-language-indicator"&gt;python&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;&lt;CODE&gt;df1 &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; df1&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;withWatermark&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"event_time_col"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;"30 minutes"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
df2 &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; df2&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;withWatermark&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;&lt;SPAN class="token token"&gt;"event_time_col"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; &lt;SPAN class="token token"&gt;"30 minutes"&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
result &lt;SPAN class="token token operator"&gt;=&lt;/SPAN&gt; df1&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;join&lt;SPAN class="token token punctuation"&gt;(&lt;/SPAN&gt;df2&lt;SPAN class="token token punctuation"&gt;,&lt;/SPAN&gt; &lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;.&lt;/SPAN&gt;&lt;SPAN class="token token punctuation"&gt;)&lt;/SPAN&gt;
&lt;/CODE&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Adjust the watermark according to your use case latency/late-arrival tolerances.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;3. Monitor state store metrics&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;Regularly check&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;memoryUsedBytes&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;and&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;CODE&gt;numRowsTotal&lt;/CODE&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;for each stateful operator. Unbounded growth signals missing watermark or join design issues.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;4. Partition/cluster source Delta tables on key columns (for large tables)&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;This optimizes both streaming read and joins.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;STRONG&gt;5. For batch use, use availableNow and checkpointing&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;But be aware this will re-scan all available data, so it’s not for true streaming.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H2 id="relevant-documentation--key-options" class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0 md:text-lg [hr+&amp;amp;]:mt-4"&gt;Relevant Documentation &amp;amp; Key Options&lt;/H2&gt;
&lt;UL class="marker:text-quiet list-disc"&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;[Delta streaming source documentation, start options, and watermarks]&lt;/P&gt;
&lt;/LI&gt;
&lt;LI class="py-0 my-0 prose-p:pt-0 prose-p:mb-2 prose-p:my-0 [&amp;amp;&amp;gt;p]:pt-0 [&amp;amp;&amp;gt;p]:mb-2 [&amp;amp;&amp;gt;p]:my-0"&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;&lt;A class="reset interactable cursor-pointer decoration-1 underline-offset-1 text-super hover:underline font-semibold" href="https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/delta-lake#specify-initial-position" target="_blank" rel="nofollow noopener"&gt;&lt;SPAN class="text-box-trim-both"&gt;Databricks: "Best Practices for Using Structured Streaming"&lt;/SPAN&gt;&lt;/A&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H2 id="summary-table-streaming-join-behavior" class="mb-2 mt-4 font-display font-semimedium text-base first:mt-0 md:text-lg [hr+&amp;amp;]:mt-4"&gt;Summary Table: Streaming Join Behavior&lt;/H2&gt;
&lt;DIV class="group relative"&gt;
&lt;DIV class="w-full overflow-x-auto md:max-w-[90vw] border-subtlest ring-subtlest divide-subtlest bg-transparent"&gt;
&lt;TABLE class="border-subtler my-[1em] w-full table-auto border-separate border-spacing-0 border-l border-t"&gt;
&lt;THEAD class="bg-subtler"&gt;
&lt;TR&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Scenario&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Data Read per Trigger&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Join Memory Growth&lt;/TH&gt;
&lt;TH class="border-subtler p-sm break-normal border-b border-r text-left align-top"&gt;Watermark Effect&lt;/TH&gt;
&lt;/TR&gt;
&lt;/THEAD&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;availableNow True&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;All data (batch)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;High (entire set)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;N/A (batch only)&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Streaming, no watermark&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;New + old (no eviction)&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Unbounded/high&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;None&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Streaming + watermark&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Only new data&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Bounded&lt;/TD&gt;
&lt;TD class="px-sm border-subtler min-w-[48px] break-normal border-b border-r"&gt;Significant&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;/DIV&gt;
&lt;DIV class="bg-base border-subtler shadow-subtle pointer-coarse:opacity-100 right-xs absolute bottom-0 flex rounded-lg border opacity-0 transition-opacity group-hover:opacity-100 [&amp;amp;&amp;gt;*:not(:first-child)]:border-subtle [&amp;amp;&amp;gt;*:not(:first-child)]:border-l"&gt;
&lt;DIV class="flex"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="flex"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;P class="my-2 [&amp;amp;+p]:mt-4 [&amp;amp;_strong:has(+br)]:inline-block [&amp;amp;_strong:has(+br)]:pb-2"&gt;If you continue joins&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;without watermarks&lt;/STRONG&gt;, expect state store memory to grow linearly with data size, which is unsustainable. Set watermarks and optimize triggers for production.&lt;/P&gt;</description>
      <pubDate>Wed, 12 Nov 2025 16:53:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138813#M51020</guid>
      <dc:creator>mark_ott</dc:creator>
      <dc:date>2025-11-12T16:53:28Z</dc:date>
    </item>
    <item>
      <title>Re: delta as streaming source, can the reader reads only newly appended rows?</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138826#M51023</link>
      <description>&lt;P&gt;Hello &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/82205"&gt;@mark_ott&lt;/a&gt;&amp;nbsp;&lt;BR /&gt;Thanks for the explanation. Very clear and constructive.&amp;nbsp;&lt;BR /&gt;Our design is leveraging structured streaming but f&lt;STRONG&gt;or batch use, with availableNow and checkpointing.&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;The scenario that I described occurs in Gold storage. After some reading and testing, I decided switching to Declarative pipeline materialized view instead of inner joining of multiple streaming tables. Still in the middle of code modification and testing.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Comments and advices are always welcome.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thanks again for the clarification.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Na.&lt;/P&gt;</description>
      <pubDate>Wed, 12 Nov 2025 17:21:56 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-as-streaming-source-can-the-reader-reads-only-newly/m-p/138826#M51023</guid>
      <dc:creator>cdn_yyz_yul</dc:creator>
      <dc:date>2025-11-12T17:21:56Z</dc:date>
    </item>
  </channel>
</rss>

