cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

delta as streaming source, can the reader reads only newly appended rows?

cdn_yyz_yul
New Contributor III

Hello everyone,

In our implementation of Medallion Architecture, we want to stream changes with spark structured streaming. I would like some advice on how to use delta table as source correctly, and if there is performance (memory usage) concern in the long run.

Summary of the scenario:

Source: Delta table, append-only

# to read from source
df =  spark.readStream.format("delta").table("table_name")

Sink: Delta table

# to write to sink
df.writeStream.format("delta").outputMode("append").option(
	    "checkpointLocation", "location"))
).trigger(availableNow=True).table("target_table_name")

 

Steps used during testing:

(the final implementation has code does the same sequence of operations)

  • Read three delta tables as df1, df2, df3.
  • inner join them without setting watermark and window constraints.
  • Write to target delta table.

 

  1. Observed that the df1-df3  contains all the rows of respective delta tables after each trigger. 

For example, in a clean environment where the source table contains x1, x2, x3 number of rows, do initial run, the three dfs contains x1, x2, x3 rows respectively. Continue and finish the join and write to target.

2. Appending new rows to source delta, trigger the stream read, observed df1-df3, each contains the rows of the entire source table, i.e., x + new, continue and finish.

 

It is true that the streamWriter wirtes only updates to the sink. But, will the join take more and more memory and process more and more rows as the data grows?

 

Databricks doc says:

You can use the following options to specify the starting point of the Delta Lake streaming source without processing the entire table.

startingVersion: The Delta Lake version to start from. Databricks recommends omitting this option for most workloads. When not set, the stream starts from the latest available version including a complete snapshot of the table at that moment and future changes as change data.

https://learn.microsoft.com/en-us/azure/databricks/structured-streaming/delta-lake#specify-initial-p...

 

The initial run:
   "stateOperators": [
        {
            "operatorName": "symmetricHashJoin",
            "numRowsTotal": 48847,
            "numRowsUpdated": 48847,
            "allUpdatesTimeMs": 324994,
            "numRowsRemoved": 0,
            "allRemovalsTimeMs": 5,
            "commitTimeMs": 50117,
            "memoryUsedBytes": 4246347944,
            "numRowsDroppedByWatermark": 0,
            "numShufflePartitions": 200,
            "numStateStoreInstances": 800,
            "customMetrics":...},

        {
            "operatorName": "symmetricHashJoin",
            "numRowsTotal": 48858,
            "numRowsUpdated": 48858,
            "allUpdatesTimeMs": 325977,
            "numRowsRemoved": 0,
            "allRemovalsTimeMs": 0,
            "commitTimeMs": 54268,
            "memoryUsedBytes": 1048889302,
            "numRowsDroppedByWatermark": 0,
            "numShufflePartitions": 200,
            "numStateStoreInstances": 800,
            "customMetrics":.....
During the second run:

    "stateOperators": [
        {
            "operatorName": "symmetricHashJoin",
            "numRowsTotal": 61057,
            "numRowsUpdated": 12210,
            "allUpdatesTimeMs": 352948,
            "numRowsRemoved": 0,
            "allRemovalsTimeMs": 5,
            "commitTimeMs": 63948,
            "memoryUsedBytes": 7248908645,
            "numRowsDroppedByWatermark": 0,
            "numShufflePartitions": 200,
            "numStateStoreInstances": 800,
            "customMetrics": {...}
        },
        {
            "operatorName": "symmetricHashJoin",
            "numRowsTotal": 61073,
            "numRowsUpdated": 12215,
            "allUpdatesTimeMs": 413147,
            "numRowsRemoved": 0,
            "allRemovalsTimeMs": 7,
            "commitTimeMs": 73142,
            "memoryUsedBytes": 8876238984,
            "numRowsDroppedByWatermark": 0,
            "numShufflePartitions": 200,
            "numStateStoreInstances": 800,......

Your advice is appreciated.
Na.

3 REPLIES 3

bianca_unifeye
New Contributor II

First of all, you are using append-only reads, which means that every time your stream triggers, Spark will process the entire Delta snapshot rather than just the changes.
Thatโ€™s why youโ€™re observing the memory usage increase after each run, itโ€™s not a bug, itโ€™s how Spark Structured Streaming works under the hood.

Use instead Delta Change Data Feed (CDF)

Change Data Feed (CDF) is a built-in Delta feature that lets you read only the changes (inserts, updates, deletes) instead of the full dataset.
When you enable it, Spark treats the Delta table as a true incremental stream source.

https://docs.databricks.com/aws/en/delta/delta-change-data-feed

Thanks @bianca_unifeye 
As the trigger is AvailableNow, each trigger starts with a new spark session. Then, When setting the following, "By default, the stream returns the latest snapshot of the table when the stream first starts as an INSERT and future changes as change data".

 

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

 

This is to say, enable CDF or not, will not make a difference in this case.

To get only last changes, I had to set 

 

.option("startingVersion", "latest")

 

But, the joining of multiple streaming tables will be problematic.

I will explore how AUTO CDC works, and see if it is a better solution than streaming from a Delta table.
Databricks saysDatabricks recommends streaming from the CDC feed of a Delta table (option 1) rather than the Delta table itself (option 2) whenever possible.

Any comments and advices are appreciated.
Na.

mark_ott
Databricks Employee
Databricks Employee

In your scenario using Medallion Architecture with Delta tables as both streaming source and sink, it is important to understand Spark Structured Streaming behavior and performance characteristics, especially with joins and memory usage. Here is a direct, actionable analysis based on your detailed setup and observed metrics.

How Delta Table as Source Works in Streaming

When you use a Delta table as a streaming source (spark.readStream.format("delta").table("table_name")), Spark Structured Streaming tracks new data files appended to the Delta log. On initial start, unless you specify "startingVersion", Spark reads a full snapshot of the table (all rows). For every subsequent trigger, only new files/data are processed as micro-batch increments.

Key behavior:

  • On the first trigger, the read covers the entire table.

  • On subsequent triggers, it reads only new appended files (new data) since the last checkpoint.

Why Joins Appear to Process All Rows Each Trigger

What you observed:

  • On the initial run, df1-df3 have all current rows, ฯ€1, ฯ€2, ฯ€3. Join+write occur.

  • After appending new rows, and triggering, you see df1-df3 now have ฯ€1+n, ฯ€2+m, ฯ€3+k rows, always including all past data.

This happens because:

  • Each triggerโ€™s streaming DataFrame by default represents all unprocessed data (since last checkpoint), but should not include all rows from the physical table every time, unless you configured "availableNow=true" -- in which case, you get a bounded (batch-like) execution and a full scan occurs.

  • If you run with .trigger(availableNow=True), the micro-batch will process the full available data each time; this is intended for one-off refreshes, not continuous streaming.

  • For normal .trigger(processingTime='interval'), each trigger sees just new data, leading to much lower processing/memory.

Memory Growth: The Real Concern with Joins

Structured Streaming joins (especially inner/outer) build in-memory state for joined keys. If you do not set watermark or retention duration, the in-memory state grows unbounded, because Spark assumes all historic data may still match a late-arriving record.

  • The state store (see "numRowsTotal", "memoryUsedBytes" in your metrics) accumulates old data.

  • Each symmetric hash join instance holds all seen join keys until it can safely evict data, which only happens with event time watermarks.

Without Watermarks:

  • Memory usage increases with each new batch, as old keys are never "timed out" or removed.

  • This can easily lead to OOM (Out Of Memory) errors or excessive state store growth, especially in large tables.

With Watermarks:

  • Watermarks let Spark know โ€œdata older than this is safe to drop from memory,โ€ so the state store remains bounded.

  • For append-only tables and inner joins, set watermarks on the event-time column and ideally design joins so only a recent window of keys must be kept in memory.

Recommendations for Your Scenario

1. Use streaming mode (not availableNow) for continuous ingest

  • Only use availableNow=True for batch catch-up; for continuous, classic streaming mode is more memory-efficient.

2. Always set watermarks in streaming joins

  • Example:

    python
    df1 = df1.withWatermark("event_time_col", "30 minutes") df2 = df2.withWatermark("event_time_col", "30 minutes") result = df1.join(df2, ...)

    Adjust the watermark according to your use case latency/late-arrival tolerances.

3. Monitor state store metrics

  • Regularly check memoryUsedBytes and numRowsTotal for each stateful operator. Unbounded growth signals missing watermark or join design issues.

4. Partition/cluster source Delta tables on key columns (for large tables)

  • This optimizes both streaming read and joins.

5. For batch use, use availableNow and checkpointing

  • But be aware this will re-scan all available data, so itโ€™s not for true streaming.

Relevant Documentation & Key Options

Summary Table: Streaming Join Behavior

Scenario Data Read per Trigger Join Memory Growth Watermark Effect
availableNow True All data (batch) High (entire set) N/A (batch only)
Streaming, no watermark New + old (no eviction) Unbounded/high None
Streaming + watermark Only new data Bounded Significant
 
 

If you continue joins without watermarks, expect state store memory to grow linearly with data size, which is unsustainable. Set watermarks and optimize triggers for production.