In my SQL data transformation pipeline, I'm doing chained/cascading window aggregations: for example, I want to do average over the last 5 minutes, then compute average over the past day on top of the 5 minute average, so that my aggregations are more manageable.
An example:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructField, StructType, TimestampType, LongType, DoubleType, IntegerType
schema = StructType(
[
StructField('createTime', TimestampType(), True),
StructField('orderId', LongType(), True),
StructField('payAmount', DoubleType(), True),
StructField('payPlatform', IntegerType(), True),
StructField('provinceId', IntegerType(), True),
])
streaming_df = session.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "payment_msg")\
.option("startingOffsets","earliest")\
.load()\
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
.select("parsed_value.*")\
.withWatermark("createTime", "10 seconds")
streaming_df.createOrReplaceTempView("payment_msg")
session.sql("""
DROP VIEW IF EXISTS v0
""")
# redefine watermark
v0 = session.sql("""
SELECT
window.start AS window_start, window.end AS window_end, provinceId, avg(payAmount) as avgPayAmount
FROM payment_msg
GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")
v0.withWatermark("window_start", "1 second").createOrReplaceTempView("v0")
streaming_df = session.sql("""
SELECT window, provinceId, avg(avgPayAmount) as avgPayAmount
FROM v0
GROUP BY provinceId, window(window_start, '2 minute')
""")
This does execute but for some reason it does not write anything into my sink. Can anyone help with this?