Streaming Live Table - What is actually computed?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-09-2024 09:30 PM
Can anyone please share in a DLT or structured streaming task, what group of rows are computed?
Specific scenarios:
1. when a streaming table A joining a delta table B. Is each of the minibatches in A joining the whole delta table? Does Spark compute the joining from each minibatch with the whole table B?
2. when a streaming table A joining another streaming table B. Does Spark compute the joining from only the new minibatch in A with minibatch in B? or the whole table A is joining the whole table B?
Thanks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-10-2024 12:05 AM
Hi @tliuzillow ,
1. Stream-static Join: Each minibatch from the streaming table (A) is joined with the entire Delta table (B).
2. Stream-stream Join: Each minibatch from the streaming table(A) is joined with minibatch from the streaming table(B).
However, as per documentation "the challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. "
This is why Spark can also keep the historical data in the buffer, which allows to match incoming data with past records, thus ensuring complete join results.
To implement this, you will use watermarking. Here is the code sample from the above documentation:
from pyspark.sql.functions import expr
impressions = spark.readStream. ...
clicks = spark.readStream. ...
# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
# Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)

