Monday
Hello everyone,
In our implementation of Medallion Architecture, we want to stream changes with spark structured streaming. I would like some advice on how to use delta table as source correctly, and if there is performance (memory usage) concern in the long run.
Summary of the scenario:
Source: Delta table, append-only
# to read from source
df = spark.readStream.format("delta").table("table_name")Sink: Delta table
# to write to sink
df.writeStream.format("delta").outputMode("append").option(
"checkpointLocation", "location"))
).trigger(availableNow=True).table("target_table_name")
Steps used during testing:
(the final implementation has code does the same sequence of operations)
For example, in a clean environment where the source table contains x1, x2, x3 number of rows, do initial run, the three dfs contains x1, x2, x3 rows respectively. Continue and finish the join and write to target.
2. Appending new rows to source delta, trigger the stream read, observed df1-df3, each contains the rows of the entire source table, i.e., x + new, continue and finish.
It is true that the streamWriter wirtes only updates to the sink. But, will the join take more and more memory and process more and more rows as the data grows?
Databricks doc says:
You can use the following options to specify the starting point of the Delta Lake streaming source without processing the entire table.
startingVersion: The Delta Lake version to start from. Databricks recommends omitting this option for most workloads. When not set, the stream starts from the latest available version including a complete snapshot of the table at that moment and future changes as change data.
The initial run:
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 48847,
"numRowsUpdated": 48847,
"allUpdatesTimeMs": 324994,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 5,
"commitTimeMs": 50117,
"memoryUsedBytes": 4246347944,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 800,
"customMetrics":...},
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 48858,
"numRowsUpdated": 48858,
"allUpdatesTimeMs": 325977,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 54268,
"memoryUsedBytes": 1048889302,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 800,
"customMetrics":.....During the second run:
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 61057,
"numRowsUpdated": 12210,
"allUpdatesTimeMs": 352948,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 5,
"commitTimeMs": 63948,
"memoryUsedBytes": 7248908645,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 800,
"customMetrics": {...}
},
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 61073,
"numRowsUpdated": 12215,
"allUpdatesTimeMs": 413147,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 7,
"commitTimeMs": 73142,
"memoryUsedBytes": 8876238984,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 800,......Your advice is appreciated.
Na.
yesterday
First of all, you are using append-only reads, which means that every time your stream triggers, Spark will process the entire Delta snapshot rather than just the changes.
Thatโs why youโre observing the memory usage increase after each run, itโs not a bug, itโs how Spark Structured Streaming works under the hood.
Change Data Feed (CDF) is a built-in Delta feature that lets you read only the changes (inserts, updates, deletes) instead of the full dataset.
When you enable it, Spark treats the Delta table as a true incremental stream source.
https://docs.databricks.com/aws/en/delta/delta-change-data-feed
2 hours ago
Thanks @bianca_unifeye
As the trigger is AvailableNow, each trigger starts with a new spark session. Then, When setting the following, "By default, the stream returns the latest snapshot of the table when the stream first starts as an INSERT and future changes as change data".
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
This is to say, enable CDF or not, will not make a difference in this case.
To get only last changes, I had to set
.option("startingVersion", "latest")
But, the joining of multiple streaming tables will be problematic.
I will explore how AUTO CDC works, and see if it is a better solution than streaming from a Delta table.
Databricks says: Databricks recommends streaming from the CDC feed of a Delta table (option 1) rather than the Delta table itself (option 2) whenever possible.
Any comments and advices are appreciated.
Na.
11m ago
In your scenario using Medallion Architecture with Delta tables as both streaming source and sink, it is important to understand Spark Structured Streaming behavior and performance characteristics, especially with joins and memory usage. Here is a direct, actionable analysis based on your detailed setup and observed metrics.
When you use a Delta table as a streaming source (spark.readStream.format("delta").table("table_name")), Spark Structured Streaming tracks new data files appended to the Delta log. On initial start, unless you specify "startingVersion", Spark reads a full snapshot of the table (all rows). For every subsequent trigger, only new files/data are processed as micro-batch increments.
Key behavior:
On the first trigger, the read covers the entire table.
On subsequent triggers, it reads only new appended files (new data) since the last checkpoint.
What you observed:
On the initial run, df1-df3 have all current rows, ฯ1, ฯ2, ฯ3. Join+write occur.
After appending new rows, and triggering, you see df1-df3 now have ฯ1+n, ฯ2+m, ฯ3+k rows, always including all past data.
This happens because:
Each triggerโs streaming DataFrame by default represents all unprocessed data (since last checkpoint), but should not include all rows from the physical table every time, unless you configured "availableNow=true" -- in which case, you get a bounded (batch-like) execution and a full scan occurs.
If you run with .trigger(availableNow=True), the micro-batch will process the full available data each time; this is intended for one-off refreshes, not continuous streaming.
For normal .trigger(processingTime='interval'), each trigger sees just new data, leading to much lower processing/memory.
Structured Streaming joins (especially inner/outer) build in-memory state for joined keys. If you do not set watermark or retention duration, the in-memory state grows unbounded, because Spark assumes all historic data may still match a late-arriving record.
The state store (see "numRowsTotal", "memoryUsedBytes" in your metrics) accumulates old data.
Each symmetric hash join instance holds all seen join keys until it can safely evict data, which only happens with event time watermarks.
Memory usage increases with each new batch, as old keys are never "timed out" or removed.
This can easily lead to OOM (Out Of Memory) errors or excessive state store growth, especially in large tables.
Watermarks let Spark know โdata older than this is safe to drop from memory,โ so the state store remains bounded.
For append-only tables and inner joins, set watermarks on the event-time column and ideally design joins so only a recent window of keys must be kept in memory.
1. Use streaming mode (not availableNow) for continuous ingest
Only use availableNow=True for batch catch-up; for continuous, classic streaming mode is more memory-efficient.
2. Always set watermarks in streaming joins
Example:
df1 = df1.withWatermark("event_time_col", "30 minutes")
df2 = df2.withWatermark("event_time_col", "30 minutes")
result = df1.join(df2, ...)
Adjust the watermark according to your use case latency/late-arrival tolerances.
3. Monitor state store metrics
Regularly check memoryUsedBytes and numRowsTotal for each stateful operator. Unbounded growth signals missing watermark or join design issues.
4. Partition/cluster source Delta tables on key columns (for large tables)
This optimizes both streaming read and joins.
5. For batch use, use availableNow and checkpointing
But be aware this will re-scan all available data, so itโs not for true streaming.
[Delta streaming source documentation, start options, and watermarks]
| Scenario | Data Read per Trigger | Join Memory Growth | Watermark Effect |
|---|---|---|---|
| availableNow True | All data (batch) | High (entire set) | N/A (batch only) |
| Streaming, no watermark | New + old (no eviction) | Unbounded/high | None |
| Streaming + watermark | Only new data | Bounded | Significant |
If you continue joins without watermarks, expect state store memory to grow linearly with data size, which is unsustainable. Set watermarks and optimize triggers for production.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now