<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Chaining window aggregations in SQL in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/chaining-window-aggregations-in-sql/m-p/67734#M33428</link>
    <description>&lt;P&gt;In my SQL data transformation pipeline, I'm doing chained/cascading window aggregations: for example, I want to do average over the last 5 minutes, then compute average over the past day on top of the 5 minute average, so that my aggregations are more manageable.&lt;/P&gt;&lt;P&gt;An example:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructField, StructType, TimestampType, LongType, DoubleType, IntegerType

schema = StructType(
    [
        StructField('createTime', TimestampType(), True), 
        StructField('orderId', LongType(), True), 
        StructField('payAmount', DoubleType(), True), 
        StructField('payPlatform', IntegerType(), True), 
        StructField('provinceId', IntegerType(), True),
    ])

streaming_df = session.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "payment_msg")\
    .option("startingOffsets","earliest")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
    .select("parsed_value.*")\
    .withWatermark("createTime", "10 seconds")

streaming_df.createOrReplaceTempView("payment_msg")

session.sql("""
DROP VIEW IF EXISTS v0
""")

# redefine watermark
v0 = session.sql("""
SELECT
    window.start AS window_start, window.end AS window_end, provinceId, avg(payAmount) as avgPayAmount
    FROM payment_msg
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")
v0.withWatermark("window_start", "1 second").createOrReplaceTempView("v0")
streaming_df = session.sql("""
SELECT window, provinceId, avg(avgPayAmount) as avgPayAmount
    FROM v0
    GROUP BY provinceId, window(window_start, '2 minute')
""")&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This does execute but for some reason it does not write anything into my sink. Can anyone help with this?&lt;/P&gt;</description>
    <pubDate>Tue, 30 Apr 2024 18:52:53 GMT</pubDate>
    <dc:creator>chloeh</dc:creator>
    <dc:date>2024-04-30T18:52:53Z</dc:date>
    <item>
      <title>Chaining window aggregations in SQL</title>
      <link>https://community.databricks.com/t5/data-engineering/chaining-window-aggregations-in-sql/m-p/67734#M33428</link>
      <description>&lt;P&gt;In my SQL data transformation pipeline, I'm doing chained/cascading window aggregations: for example, I want to do average over the last 5 minutes, then compute average over the past day on top of the 5 minute average, so that my aggregations are more manageable.&lt;/P&gt;&lt;P&gt;An example:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructField, StructType, TimestampType, LongType, DoubleType, IntegerType

schema = StructType(
    [
        StructField('createTime', TimestampType(), True), 
        StructField('orderId', LongType(), True), 
        StructField('payAmount', DoubleType(), True), 
        StructField('payPlatform', IntegerType(), True), 
        StructField('provinceId', IntegerType(), True),
    ])

streaming_df = session.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "payment_msg")\
    .option("startingOffsets","earliest")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
    .select("parsed_value.*")\
    .withWatermark("createTime", "10 seconds")

streaming_df.createOrReplaceTempView("payment_msg")

session.sql("""
DROP VIEW IF EXISTS v0
""")

# redefine watermark
v0 = session.sql("""
SELECT
    window.start AS window_start, window.end AS window_end, provinceId, avg(payAmount) as avgPayAmount
    FROM payment_msg
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")
v0.withWatermark("window_start", "1 second").createOrReplaceTempView("v0")
streaming_df = session.sql("""
SELECT window, provinceId, avg(avgPayAmount) as avgPayAmount
    FROM v0
    GROUP BY provinceId, window(window_start, '2 minute')
""")&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This does execute but for some reason it does not write anything into my sink. Can anyone help with this?&lt;/P&gt;</description>
      <pubDate>Tue, 30 Apr 2024 18:52:53 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/chaining-window-aggregations-in-sql/m-p/67734#M33428</guid>
      <dc:creator>chloeh</dc:creator>
      <dc:date>2024-04-30T18:52:53Z</dc:date>
    </item>
  </channel>
</rss>

