cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark Structured Streaming: How to run N queries on each window

VivekBhalgat
New Contributor II

I have timeseries data in k Kafka topics. I would like to read this data into windows of length 10 minutes. For each window, I want to run N SQL queries and materialize result. The specific N queries to run depends on the kafka topic name. How should I approach the above problem?

Questions:

-- Instead of using count as aggregation function in the first example code, is there a way to get access to Dataset<Row> for each window so that I can write my logic and materialize data.

-- There are different set of SQL queries configured for each kafka topic.

--

-- RelationalGroupedDataset kafkaDfStreamingWithWindow =

    kafkaDfStreaming.groupBy(

      functions.window(functions.col(KafkaDataset.KAFKA_DF_COLUMN_TIMESTAMP), "60 seconds", "60 seconds")).

KeyValueGroupedDataset.flatMapGroups function exists and its probably similar to what I am looking for, is there a way to convert RelationalGroupedDataset to KeyValueGroupedDataset. Any sample code for end to end implementation.

-- Am I thinking about the problem the right way? is there a better way to approach this problem?

Any sample code or direct would be helpful.

What I have tried (primitive code)

DataStreamReader kafkaDfStreaming=
        sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "http://localhost:9092")
            .option("includeTimestamp", "true")
            .option("startingOffsets", "earliest")
            .option("endingOffset", "latest")
            .option("includeHeaders", "true")
            .option("subscribePattern", "HelloWorld.*");
 
Dataset<Row> streamingWindow =
        kafkaDfStreaming
            .groupBy(
                functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
                functions.col("topic))
            .count()
            .select("window.start", "window.end", "topic", "count");
 
    streamingWindow
        .writeStream()
        .format("console")
        .trigger(Trigger.ProcessingTime("600 seconds"))
        .outputMode(OutputMode.Update())
        .start()
        .awaitTermination();
 

The above sample code works and it prints number of rows per topic per window.

What I have tried and has not worked:

// Issue with code
// kafkaDfStreamingWithWindow.df() gives access to entire df and not df for each window.
 
    Dataset<Row> kafkaDfStreaming = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "http://localhost:9092")
            .option("includeTimestamp", "true")
            .option("startingOffsets", "earliest")
            .option("endingOffset", "latest")
            .option("includeHeaders", "true")
            .option("subscribePattern", "HelloWorld.*");
 
    RelationalGroupedDataset kafkaDfStreamingWithWindow =
        kafkaDfStreaming.groupBy(
            functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
            functions.col("topic"));
 
    DataStreamWriter<Row> kafkaStreaming = kafkaDfStreamingWithWindow.df().writeStream();
 
    DataStreamWriter<Row> afterProcessingSparkStream =
        kafkaStreaming.foreachBatch(
            new VoidFunction2<Dataset<Row>, Long>() {
              @Override
              public void call(Dataset<Row> kafkaDf, Long batchId) throws Exception {
                     /// Processing code to materialize data to database. Its ok to overwrite data.
              }
            });
 
 
    StreamingQuery query =
        afterProcessingSparkStream.trigger(Trigger.ProcessingTime("600 seconds")).outputMode(OutputMode.Update()).start();
 
    query.awaitTermination();

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group