cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Structured Streaming from a delta table that is a dump of kafka and get the latest record per key

liv1
New Contributor II

I'm trying to use Structured Streaming in scala to stream from a delta table that is a dump of a kafka topic where each record/message is an update of attributes for the key and no messages from kafka are dropped from the dump, but the value is flattened into their own columns. I have multiple jobs representing different entities, and some require joining 2 of these dump tables together with a stream-stream join. For example case, the first topic is attributes of a product (key is product_id, value contains attributes like name, brand, size) and we would want to join it to the dump of a second topic of prices for the product (key is product_id, value contains attributes like price, valid from, valid to) so that our final gold output table can have attributes from both topics.

Example Source Table Schema

keytimestampoffsetpartitionvalue_namevalue_brand...
12023-10-11T01:00:00123441apple pieBakeryA...
12023-10-11T01:30:00123451Apple PieBakeryA ...

My streaming job should run hourly and get only the latest record for each key (to get only the latest attributes of the product). Currently in our batch pipeline we are using a spark window function ordered by timestamp and offset, partitioned by key. To utilize the same function in a streaming pipeline, we would need to use .foreachBatch and MERGE INTO to keep only the updates, however I then cannot do stream-stream joins using foreachBatch.

To work around using foreachBatch and stream-stream joins, I thought of creating an intermediate table streaming from the dump and getting only the latest record per key which would also help with audibility and some topics being used for multiple gold tables. The downside here is I cannot stream from the intermediate table into the gold table since the input of a streaming query can only be written in append mode.

To summarize,

  1. How can I get the latest message per key while also doing a stream-stream join? Either in one job or multiple
  2. Any workaround to creating an intermediate table written with merge into (not append only) and running a streaming job that sources from it

 

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @liv1 , 

To get the latest message per key in your streaming job and perform stream-stream joins, you can use Databricks Delta's time travel feature in combination with foreachBatch(). You can use Delta's time travel feature to maintain the latest snapshot of the data in the Delta table at each batch, and then perform stream-stream joins on the latest snapshot of the Delta tables.

Here is an example of how to read from your Delta table, and get the latest message get the latest message per key while doing a stream-stream join, you can use the reduce operation in Spark Structured Streaming.

The reduce operation allows you to group and aggregate multiple events by a key, and apply custom aggregation logic to the grouped data.

The steps to use this operation are as follows:

  1. Load the delta tables for the two topics as streams using readStream.

  2. Use groupBy to create groups based on the key column.

  3. Apply the reduce operation to group to reduce the records to a single record per key containing only the latest data. You can achieve this by comparing the timestamps of each record in the group and selecting the one with the latest timestamp.

  4. Join the reduced streams for the two topics using join and specifying the join conditions.

Here is an example of how you can accomplish this in a single job using Spark Scala:

%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

val spark = SparkSession.builder().appName("DeltaStreamJoin").getOrCreate()

// Set the Spark log level to ERROR to avoid verbose log messages
spark.sparkContext.setLogLevel("ERROR")

// Load delta table dumps as streams
val products = spark.readStream.format("delta").load("/tmp/products_delta_table")
val prices = spark.readStream.format("delta").load("/tmp/prices_delta_table")

// Group by key and apply reduce to get the latest record per key
def reduceByKey(df: DataFrame): DataFrame = {
  df.groupBy(col("key"))
    .reduce((r1, r2) => if (r1.getAs[Long]("timestamp") > r2.getAs[Long]("timestamp")) r1 else r2)
}

// Reduce streams and join them together
val productPrices = reduceByKey(products).join(reduceByKey(prices), Seq("key"), "inner")

// Start streaming the join results
val query = productPrices.writeStream.format("delta")
  .option("checkpointLocation", "/tmp/checkpoint")
  .option("mergeSchema", "true")
  .trigger(Trigger.ProcessingTime("1 hour"))
  .start("/tmp/product_prices")

Alternatively, if you don't want to use the reduce operation, you can create an intermediate table using the "MERGE INTO" statement and run a streaming job that sources from it. To workaround the append only limitation of Structured Streaming, you can create a batch job that reads the data from the intermediate table and writes it in append mode to the gold table.

Here's an example of how you can accomplish this:

%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

val spark = SparkSession.builder().appName("DeltaStreamJoin").getOrCreate()

// Set the Spark log level to ERROR to avoid verbose log messages
spark.sparkContext.setLogLevel("ERROR")

// Load delta table dumps as streams
val products = spark.readStream.format("delta").load("/tmp/products_delta_table")
val prices = spark.readStream.format("delta").load("/tmp/prices_delta_table")

// Create an intermediate table for the reduced data
val productPricesReduced = reduceByKey(products).join(reduceByKey(prices), Seq("key"), "inner")
val productPricesIntermediate = "productPricesIntermediate"

spark.sql(s"""
  | CREATE TABLE IF NOT EXISTS $productPricesIntermediate (
  |   key LONG,
  |   timestamp LONG,
  |   value_name STRING,
  |   value_brand STRING,
  |   price DOUBLE,
  |   valid_from STRING,
  |   valid_to STRING,
  |   ...
  | )
  | USING DELTA
""".stripMargin)

// Write the reduced data to the intermediate table
productPricesReduced.writeStream
  .format("delta")
  .option("checkpointLocation", "/tmp/checkpoint")
  .option("mergeSchema", "true")
  .trigger(Trigger.ProcessingTime("1 hour"))
  .start(productPricesIntermediate)

// Create a batch job to read from the intermediate table and write to the gold table
val productPricesGold = "productPricesGold"

spark.sql(s"""
  | CREATE TABLE IF NOT EXISTS $productPricesGold (
  |   key LONG,
  |   timestamp LONG,
  |   value_name STRING,
  |   value_brand STRING,
  |   price DOUBLE,
  |   valid_from STRING,
  |   valid_to STRING,
  |   ...
  | )
  | USING DELTA
""".stripMargin)

val batchJob = spark.read.format("delta").table(productPricesIntermediate)
  .write.format("delta").mode("append").save(productPricesGold)

This approach avoids the limitation of append-only tables in Structured Streaming, while still allowing you to stream from an intermediate table sourced from a "MERGE INTO" statement.

liv1
New Contributor II

Thanks for your detailed response @Kaniz

Regarding the reduce approach, it doesn't seem to work as outlined since reduce is not a member of org.apache.spark.sql.RelationalGroupedDataset

For the second approach, to clarify are you suggesting the following

  1. Stream silver intermediate table writing with a MERGE INTO statement
  2. Batch job to read output of (1) and write to gold writing with append
  3. Stream gold table from (2) to power a different 3rd table

 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!