cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Update destination table when using Spark Structured Streaming and Delta tables

Agus1
New Contributor III

Iโ€™m trying to implement a streaming pipeline that will run hourly using Spark Structured Streaming, Scala and Delta tables. The pipeline will process different items with their details.

The source are delta tables that already exists, written hourly using the "streamWrite" command. The output should be another delta table that takes data from the source table, performs some transformations and writes to the destination table.

The problem Iโ€™m facing is that at different moments in time, the source table will bring new versions of items that were processed in the past (these are not duplicated messages, just an updated version of the same item). For these cases I need to update the item in the destination table in order to keep only the latest version.
Additionally, for some cases I need to use as source 2 streaming table and join them. Which blocks me from using "foreachBatch".

According to this, Structured Streaming can only be used on โ€œappendโ€ mode, but for my use case I would need to update the data when writing.

Is there a way to make this work?

I feel that this should be a pretty common scenario that many implementations of streaming will have to face at some point, but I wasnโ€™t able to find a way around it or any other published solutions so far.

3 REPLIES 3

Agus1
New Contributor III

Hello @Mo, thanks for the answer!

I've considered using foreachBatch, but there are 2 issues:

  • For some cases I need to use as source 2 streaming table and join them. I believe this is not supported by foreachBatch?
  • Since I would be using merge for writting the output, would it be possible to stream from the output table?

Do you know if there is a way to resolve these??

Agus1
New Contributor III

Hey @Mo 

I had the idea that stream to stream joins couldn't be performed when using "foreachBatch".
Is this not the case?

Also this part of the documentation specifies that joins can only be used with "append" mode.
So it seems like it won't work with the merge approach ๐Ÿ˜•

Tharun-Kumar
Honored Contributor II
Honored Contributor II

@Agus1 

Could you try using CDC in delta. You could use readChangeFeed to read only the changes that got applied on the source table. This is also explained here.

https://learn.microsoft.com/en-us/azure/databricks/delta/delta-change-data-feed

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.