โ10-12-2023 12:41 PM
I'm trying to use Structured Streaming in scala to stream from a delta table that is a dump of a kafka topic where each record/message is an update of attributes for the key and no messages from kafka are dropped from the dump, but the value is flattened into their own columns. I have multiple jobs representing different entities, and some require joining 2 of these dump tables together with a stream-stream join. For example case, the first topic is attributes of a product (key is product_id, value contains attributes like name, brand, size) and we would want to join it to the dump of a second topic of prices for the product (key is product_id, value contains attributes like price, valid from, valid to) so that our final gold output table can have attributes from both topics.
Example Source Table Schema
key | timestamp | offset | partition | value_name | value_brand | ... |
1 | 2023-10-11T01:00:00 | 12344 | 1 | apple pie | BakeryA | ... |
1 | 2023-10-11T01:30:00 | 12345 | 1 | Apple Pie | BakeryA | ... |
My streaming job should run hourly and get only the latest record for each key (to get only the latest attributes of the product). Currently in our batch pipeline we are using a spark window function ordered by timestamp and offset, partitioned by key. To utilize the same function in a streaming pipeline, we would need to use .foreachBatch and MERGE INTO to keep only the updates, however I then cannot do stream-stream joins using foreachBatch.
To work around using foreachBatch and stream-stream joins, I thought of creating an intermediate table streaming from the dump and getting only the latest record per key which would also help with audibility and some topics being used for multiple gold tables. The downside here is I cannot stream from the intermediate table into the gold table since the input of a streaming query can only be written in append mode.
To summarize,
โ10-16-2023 03:02 AM
Hi @liv1 ,
To get the latest message per key in your streaming job and perform stream-stream joins, you can use Databricks Delta's time travel feature in combination with foreachBatch()
. You can use Delta's time travel feature to maintain the latest snapshot of the data in the Delta table at each batch, and then perform stream-stream joins on the latest snapshot of the Delta tables.
Here is an example of how to read from your Delta table, and get the latest message get the latest message per key while doing a stream-stream join, you can use the reduce
operation in Spark Structured Streaming.
The reduce
operation allows you to group and aggregate multiple events by a key, and apply custom aggregation logic to the grouped data.
The steps to use this operation are as follows:
Load the delta tables for the two topics as streams using readStream
.
Use groupBy
to create groups based on the key column.
Apply the reduce
operation to group to reduce the records to a single record per key containing only the latest data. You can achieve this by comparing the timestamps of each record in the group and selecting the one with the latest timestamp.
Join the reduced streams for the two topics using join
and specifying the join conditions.
Here is an example of how you can accomplish this in a single job using Spark Scala:
%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
val spark = SparkSession.builder().appName("DeltaStreamJoin").getOrCreate()
// Set the Spark log level to ERROR to avoid verbose log messages
spark.sparkContext.setLogLevel("ERROR")
// Load delta table dumps as streams
val products = spark.readStream.format("delta").load("/tmp/products_delta_table")
val prices = spark.readStream.format("delta").load("/tmp/prices_delta_table")
// Group by key and apply reduce to get the latest record per key
def reduceByKey(df: DataFrame): DataFrame = {
df.groupBy(col("key"))
.reduce((r1, r2) => if (r1.getAs[Long]("timestamp") > r2.getAs[Long]("timestamp")) r1 else r2)
}
// Reduce streams and join them together
val productPrices = reduceByKey(products).join(reduceByKey(prices), Seq("key"), "inner")
// Start streaming the join results
val query = productPrices.writeStream.format("delta")
.option("checkpointLocation", "/tmp/checkpoint")
.option("mergeSchema", "true")
.trigger(Trigger.ProcessingTime("1 hour"))
.start("/tmp/product_prices")
Alternatively, if you don't want to use the reduce
operation, you can create an intermediate table using the "MERGE INTO" statement and run a streaming job that sources from it. To workaround the append only limitation of Structured Streaming, you can create a batch job that reads the data from the intermediate table and writes it in append mode to the gold table.
Here's an example of how you can accomplish this:
%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
val spark = SparkSession.builder().appName("DeltaStreamJoin").getOrCreate()
// Set the Spark log level to ERROR to avoid verbose log messages
spark.sparkContext.setLogLevel("ERROR")
// Load delta table dumps as streams
val products = spark.readStream.format("delta").load("/tmp/products_delta_table")
val prices = spark.readStream.format("delta").load("/tmp/prices_delta_table")
// Create an intermediate table for the reduced data
val productPricesReduced = reduceByKey(products).join(reduceByKey(prices), Seq("key"), "inner")
val productPricesIntermediate = "productPricesIntermediate"
spark.sql(s"""
| CREATE TABLE IF NOT EXISTS $productPricesIntermediate (
| key LONG,
| timestamp LONG,
| value_name STRING,
| value_brand STRING,
| price DOUBLE,
| valid_from STRING,
| valid_to STRING,
| ...
| )
| USING DELTA
""".stripMargin)
// Write the reduced data to the intermediate table
productPricesReduced.writeStream
.format("delta")
.option("checkpointLocation", "/tmp/checkpoint")
.option("mergeSchema", "true")
.trigger(Trigger.ProcessingTime("1 hour"))
.start(productPricesIntermediate)
// Create a batch job to read from the intermediate table and write to the gold table
val productPricesGold = "productPricesGold"
spark.sql(s"""
| CREATE TABLE IF NOT EXISTS $productPricesGold (
| key LONG,
| timestamp LONG,
| value_name STRING,
| value_brand STRING,
| price DOUBLE,
| valid_from STRING,
| valid_to STRING,
| ...
| )
| USING DELTA
""".stripMargin)
val batchJob = spark.read.format("delta").table(productPricesIntermediate)
.write.format("delta").mode("append").save(productPricesGold)
This approach avoids the limitation of append-only tables in Structured Streaming, while still allowing you to stream from an intermediate table sourced from a "MERGE INTO" statement.
โ10-20-2023 11:46 AM
Thanks for your detailed response @Kaniz_Fatma!
Regarding the reduce approach, it doesn't seem to work as outlined since reduce is not a member of org.apache.spark.sql.RelationalGroupedDataset
For the second approach, to clarify are you suggesting the following
โ08-13-2024 09:07 PM
I am confused about this recommendation. I thought the use of the append output mode in combination with aggregate queries is restricted to queries for which the aggregation is expressed using event-time and it defines a watermark.
Could you clarify ?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group