Update destination table when using Spark Structured Streaming and Delta tables
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā08-23-2023 04:56 PM - edited ā08-24-2023 06:10 AM
Iām trying to implement a streaming pipeline that will run hourly using Spark Structured Streaming, Scala and Delta tables. The pipeline will process different items with their details.
The source are delta tables that already exists, written hourly using the "streamWrite" command. The output should be another delta table that takes data from the source table, performs some transformations and writes to the destination table.
The problem Iām facing is that at different moments in time, the source table will bring new versions of items that were processed in the past (these are not duplicated messages, just an updated version of the same item). For these cases I need to update the item in the destination table in order to keep only the latest version.
Additionally, for some cases I need to use as source 2 streaming table and join them. Which blocks me from using "foreachBatch".
According to this, Structured Streaming can only be used on āappendā mode, but for my use case I would need to update the data when writing.
Is there a way to make this work?
I feel that this should be a pretty common scenario that many implementations of streaming will have to face at some point, but I wasnāt able to find a way around it or any other published solutions so far.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā08-24-2023 06:05 AM
Hello @Mo, thanks for the answer!
I've considered using foreachBatch, but there are 2 issues:
- For some cases I need to use as source 2 streaming table and join them. I believe this is not supported by foreachBatch?
- Since I would be using merge for writting the output, would it be possible to stream from the output table?
Do you know if there is a way to resolve these??
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā08-24-2023 01:58 PM
Hey @Mo
I had the idea that stream to stream joins couldn't be performed when using "foreachBatch".
Is this not the case?
Also this part of the documentation specifies that joins can only be used with "append" mode.
So it seems like it won't work with the merge approach š
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā08-24-2023 09:30 PM
Could you try using CDC in delta. You could use readChangeFeed to read only the changes that got applied on the source table. This is also explained here.
https://learn.microsoft.com/en-us/azure/databricks/delta/delta-change-data-feed

