Objective
Within the context of a delta live table, I'm trying to merge two streams aggregation, but run into challenges. Is it possible to achieve such a join?
Context
Assume
- table trades stores a list of trades with their associated time stamps
- table trades_1d sums the value of all the trades on a given day
- table stock_price stores a given stock price at some (non-constant) sampling frequency
- table stock_price_1d averages the stock price over a given day
It would translate to something along those lines
import pyspark.sql.functions as sqlf
sdf_trades = spark.readStream.format("delta").table(f"stock_price")
sdf_price = spark.readStream.format("delta").table(f"stock_price")
w = sqlf.window("timetstamp", "24 hours")
sdf_trades_1d = (
sdf_trades
.groupby(w)
.agg(sqlf.sum("trade_value"))
.withColumn("window_end", sqlf.col("window.end"))
.withColumn("window_start", sqlf.col("window.start"))
)
sdf_price_1d = (
sdf_price
.groupby(w)
.agg(sqlf.avg("value"))
.withColumn("window_end", sqlf.col("window.end"))
.withColumn("window_start", sqlf.col("window.start"))
).withWatermark("window_end", "48 hours")
sdf = sdf_trades_1d.join(sdf_price_1d, "window_end", "left")
Issue
When running the pseudo code above, I get
"Append more error: Multiple streaming aggregations are not supported with stream DataFrames/Datasets"
Any suggestion on how I can make this work?