cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Structured Streaming from a delta table that is a dump of kafka and get the latest record per key

liv1
New Contributor II

I'm trying to use Structured Streaming in scala to stream from a delta table that is a dump of a kafka topic where each record/message is an update of attributes for the key and no messages from kafka are dropped from the dump, but the value is flattened into their own columns. I have multiple jobs representing different entities, and some require joining 2 of these dump tables together with a stream-stream join. For example case, the first topic is attributes of a product (key is product_id, value contains attributes like name, brand, size) and we would want to join it to the dump of a second topic of prices for the product (key is product_id, value contains attributes like price, valid from, valid to) so that our final gold output table can have attributes from both topics.

Example Source Table Schema

keytimestampoffsetpartitionvalue_namevalue_brand...
12023-10-11T01:00:00123441apple pieBakeryA...
12023-10-11T01:30:00123451Apple PieBakeryA ...

My streaming job should run hourly and get only the latest record for each key (to get only the latest attributes of the product). Currently in our batch pipeline we are using a spark window function ordered by timestamp and offset, partitioned by key. To utilize the same function in a streaming pipeline, we would need to use .foreachBatch and MERGE INTO to keep only the updates, however I then cannot do stream-stream joins using foreachBatch.

To work around using foreachBatch and stream-stream joins, I thought of creating an intermediate table streaming from the dump and getting only the latest record per key which would also help with audibility and some topics being used for multiple gold tables. The downside here is I cannot stream from the intermediate table into the gold table since the input of a streaming query can only be written in append mode.

To summarize,

  1. How can I get the latest message per key while also doing a stream-stream join? Either in one job or multiple
  2. Any workaround to creating an intermediate table written with merge into (not append only) and running a streaming job that sources from it

 

2 REPLIES 2

liv1
New Contributor II

Thanks for your detailed response @Retired_mod

Regarding the reduce approach, it doesn't seem to work as outlined since reduce is not a member of org.apache.spark.sql.RelationalGroupedDataset

For the second approach, to clarify are you suggesting the following

  1. Stream silver intermediate table writing with a MERGE INTO statement
  2. Batch job to read output of (1) and write to gold writing with append
  3. Stream gold table from (2) to power a different 3rd table

 

Maatari
New Contributor III

I am confused about this recommendation. I thought the use of the append output mode in combination with aggregate queries is restricted to queries for which the aggregation is expressed using event-time and it defines a watermark.

Could you clarify ?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group