cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Chaining window aggregations in SQL

chloeh
New Contributor II

In my SQL data transformation pipeline, I'm doing chained/cascading window aggregations: for example, I want to do average over the last 5 minutes, then compute average over the past day on top of the 5 minute average, so that my aggregations are more manageable.

An example:

 

 

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructField, StructType, TimestampType, LongType, DoubleType, IntegerType

schema = StructType(
    [
        StructField('createTime', TimestampType(), True), 
        StructField('orderId', LongType(), True), 
        StructField('payAmount', DoubleType(), True), 
        StructField('payPlatform', IntegerType(), True), 
        StructField('provinceId', IntegerType(), True),
    ])

streaming_df = session.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "payment_msg")\
    .option("startingOffsets","earliest")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
    .select("parsed_value.*")\
    .withWatermark("createTime", "10 seconds")

streaming_df.createOrReplaceTempView("payment_msg")

session.sql("""
DROP VIEW IF EXISTS v0
""")

# redefine watermark
v0 = session.sql("""
SELECT
    window.start AS window_start, window.end AS window_end, provinceId, avg(payAmount) as avgPayAmount
    FROM payment_msg
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")
v0.withWatermark("window_start", "1 second").createOrReplaceTempView("v0")
streaming_df = session.sql("""
SELECT window, provinceId, avg(avgPayAmount) as avgPayAmount
    FROM v0
    GROUP BY provinceId, window(window_start, '2 minute')
""")

 

 

This does execute but for some reason it does not write anything into my sink. Can anyone help with this?

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @chloeh, You’re working with a Spark SQL data transformation pipeline involving chained window aggregations.

Let’s look at your code snippet and see if we can identify the issue.

First, let’s break down the steps you’ve implemented:

  1. You’re reading data from a Kafka topic named “payment_msg” using Spark Structured Streaming.
  2. You’ve defined a schema for the incoming data.
  3. You’re extracting relevant fields from the JSON data using. from_json.
  4. You’ve set a watermark on the createTime column to handle late data.
  5. You’ve created a temporary view named “payment_msg” for further processing.
  6. You’ve defined a view named “v0” that computes the average pay amount per province within 1-minute window.
  7. You’ve set a watermark on the “window_start” column for “v0”.
  8. Finally, you’re trying to compute the average pay amount per province within 2-minute windows based on the “v0” view.

Given this context, let’s troubleshoot the issue. Here are some potential areas to check:

  1. Sink Configuration: Ensure you’ve correctly configured the sink where you want to write the results. Common sinks include files (e.g., Parquet, CSV), databases (e.g., JDBC), or other streaming platforms (e.g., Kafka).

  2. Checkpoint Location: Make sure you’ve specified a valid checkpoint location for your streaming query. Checkpoints are essential for fault tolerance and state management in Spark Structured Streaming.

  3. Output Mode: Verify that you’ve set the correct output mode for your query. The available modes are “append,” “complete,” and “update.” Depending on your use case, choose the appropriate mode.

  4. Trigger Interval: Consider setting an appropriate trigger interval for your streaming query. The trigger defines how often the query processes new data. For example, you can use trigger(processingTime="10 seconds").

  5. Debugging: Check the Spark logs for any error messages or warnings related to your query. Look for clues about why no data is being written to the sink.

  6. Data Availability: Ensure that data is actually arriving in the Kafka topic “payment_msg.” You can monitor the Kafka topic to verify this.

  7. Window Size and Slide Duration: Double-check the window size and slide duration in your aggregations. Make sure they align with your desired time intervals (e.g., 5 minutes and 1 day).

  8. Testing with Smaller Windows: To debug, try reducing the window sizes (e.g., 1 minute and 2 minutes) and see if data is written to the sink. If it works, gradually increase the window sizes.

Remember to review your Spark configuration, especially related to checkpointing and output modes. If you encounter any specific error messages or issues, feel free to share them, and we can dive deeper into troubleshooting! 😊

For more information, you can refer to the Databricks community post on chaining window aggregations in SQL1. Additionally, the data.world documentation provides insights into windowed aggregations2. If you need further assistance, don’t hesitate to ask! 🚀