I'm trying to use Structured Streaming in scala to stream from a delta table that is a dump of a kafka topic where each record/message is an update of attributes for the key and no messages from kafka are dropped from the dump, but the value is flattened into their own columns. I have multiple jobs representing different entities, and some require joining 2 of these dump tables together with a stream-stream join. For example case, the first topic is attributes of a product (key is product_id, value contains attributes like name, brand, size) and we would want to join it to the dump of a second topic of prices for the product (key is product_id, value contains attributes like price, valid from, valid to) so that our final gold output table can have attributes from both topics.
Example Source Table Schema
key | timestamp | offset | partition | value_name | value_brand | ... |
1 | 2023-10-11T01:00:00 | 12344 | 1 | apple pie | BakeryA | ... |
1 | 2023-10-11T01:30:00 | 12345 | 1 | Apple Pie | BakeryA | ... |
My streaming job should run hourly and get only the latest record for each key (to get only the latest attributes of the product). Currently in our batch pipeline we are using a spark window function ordered by timestamp and offset, partitioned by key. To utilize the same function in a streaming pipeline, we would need to use .foreachBatch and MERGE INTO to keep only the updates, however I then cannot do stream-stream joins using foreachBatch.
To work around using foreachBatch and stream-stream joins, I thought of creating an intermediate table streaming from the dump and getting only the latest record per key which would also help with audibility and some topics being used for multiple gold tables. The downside here is I cannot stream from the intermediate table into the gold table since the input of a streaming query can only be written in append mode.
To summarize,
- How can I get the latest message per key while also doing a stream-stream join? Either in one job or multiple
- Any workaround to creating an intermediate table written with merge into (not append only) and running a streaming job that sources from it