Hi @liv1 ,
To get the latest message per key in your streaming job and perform stream-stream joins, you can use Databricks Delta's time travel feature in combination with foreachBatch()
. You can use Delta's time travel feature to maintain the latest snapshot of the data in the Delta table at each batch, and then perform stream-stream joins on the latest snapshot of the Delta tables.
Here is an example of how to read from your Delta table, and get the latest message get the latest message per key while doing a stream-stream join, you can use the reduce
operation in Spark Structured Streaming.
The reduce
operation allows you to group and aggregate multiple events by a key, and apply custom aggregation logic to the grouped data.
The steps to use this operation are as follows:
-
Load the delta tables for the two topics as streams using readStream
.
-
Use groupBy
to create groups based on the key column.
-
Apply the reduce
operation to group to reduce the records to a single record per key containing only the latest data. You can achieve this by comparing the timestamps of each record in the group and selecting the one with the latest timestamp.
-
Join the reduced streams for the two topics using join
and specifying the join conditions.
Here is an example of how you can accomplish this in a single job using Spark Scala:
%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
val spark = SparkSession.builder().appName("DeltaStreamJoin").getOrCreate()
// Set the Spark log level to ERROR to avoid verbose log messages
spark.sparkContext.setLogLevel("ERROR")
// Load delta table dumps as streams
val products = spark.readStream.format("delta").load("/tmp/products_delta_table")
val prices = spark.readStream.format("delta").load("/tmp/prices_delta_table")
// Group by key and apply reduce to get the latest record per key
def reduceByKey(df: DataFrame): DataFrame = {
df.groupBy(col("key"))
.reduce((r1, r2) => if (r1.getAs[Long]("timestamp") > r2.getAs[Long]("timestamp")) r1 else r2)
}
// Reduce streams and join them together
val productPrices = reduceByKey(products).join(reduceByKey(prices), Seq("key"), "inner")
// Start streaming the join results
val query = productPrices.writeStream.format("delta")
.option("checkpointLocation", "/tmp/checkpoint")
.option("mergeSchema", "true")
.trigger(Trigger.ProcessingTime("1 hour"))
.start("/tmp/product_prices")
Alternatively, if you don't want to use the reduce
operation, you can create an intermediate table using the "MERGE INTO" statement and run a streaming job that sources from it. To workaround the append only limitation of Structured Streaming, you can create a batch job that reads the data from the intermediate table and writes it in append mode to the gold table.
Here's an example of how you can accomplish this:
%scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
val spark = SparkSession.builder().appName("DeltaStreamJoin").getOrCreate()
// Set the Spark log level to ERROR to avoid verbose log messages
spark.sparkContext.setLogLevel("ERROR")
// Load delta table dumps as streams
val products = spark.readStream.format("delta").load("/tmp/products_delta_table")
val prices = spark.readStream.format("delta").load("/tmp/prices_delta_table")
// Create an intermediate table for the reduced data
val productPricesReduced = reduceByKey(products).join(reduceByKey(prices), Seq("key"), "inner")
val productPricesIntermediate = "productPricesIntermediate"
spark.sql(s"""
| CREATE TABLE IF NOT EXISTS $productPricesIntermediate (
| key LONG,
| timestamp LONG,
| value_name STRING,
| value_brand STRING,
| price DOUBLE,
| valid_from STRING,
| valid_to STRING,
| ...
| )
| USING DELTA
""".stripMargin)
// Write the reduced data to the intermediate table
productPricesReduced.writeStream
.format("delta")
.option("checkpointLocation", "/tmp/checkpoint")
.option("mergeSchema", "true")
.trigger(Trigger.ProcessingTime("1 hour"))
.start(productPricesIntermediate)
// Create a batch job to read from the intermediate table and write to the gold table
val productPricesGold = "productPricesGold"
spark.sql(s"""
| CREATE TABLE IF NOT EXISTS $productPricesGold (
| key LONG,
| timestamp LONG,
| value_name STRING,
| value_brand STRING,
| price DOUBLE,
| valid_from STRING,
| valid_to STRING,
| ...
| )
| USING DELTA
""".stripMargin)
val batchJob = spark.read.format("delta").table(productPricesIntermediate)
.write.format("delta").mode("append").save(productPricesGold)
This approach avoids the limitation of append-only tables in Structured Streaming, while still allowing you to stream from an intermediate table sourced from a "MERGE INTO" statement.