cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark Structured Streaming: How to run N queries on each window

VivekBhalgat
New Contributor II

I have timeseries data in k Kafka topics. I would like to read this data into windows of length 10 minutes. For each window, I want to run N SQL queries and materialize result. The specific N queries to run depends on the kafka topic name. How should I approach the above problem?

Questions:

-- Instead of using count as aggregation function in the first example code, is there a way to get access to Dataset<Row> for each window so that I can write my logic and materialize data.

-- There are different set of SQL queries configured for each kafka topic.

--

-- RelationalGroupedDataset kafkaDfStreamingWithWindow =

    kafkaDfStreaming.groupBy(

      functions.window(functions.col(KafkaDataset.KAFKA_DF_COLUMN_TIMESTAMP), "60 seconds", "60 seconds")).

KeyValueGroupedDataset.flatMapGroups function exists and its probably similar to what I am looking for, is there a way to convert RelationalGroupedDataset to KeyValueGroupedDataset. Any sample code for end to end implementation.

-- Am I thinking about the problem the right way? is there a better way to approach this problem?

Any sample code or direct would be helpful.

What I have tried (primitive code)

DataStreamReader kafkaDfStreaming=
        sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "http://localhost:9092")
            .option("includeTimestamp", "true")
            .option("startingOffsets", "earliest")
            .option("endingOffset", "latest")
            .option("includeHeaders", "true")
            .option("subscribePattern", "HelloWorld.*");
 
Dataset<Row> streamingWindow =
        kafkaDfStreaming
            .groupBy(
                functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
                functions.col("topic))
            .count()
            .select("window.start", "window.end", "topic", "count");
 
    streamingWindow
        .writeStream()
        .format("console")
        .trigger(Trigger.ProcessingTime("600 seconds"))
        .outputMode(OutputMode.Update())
        .start()
        .awaitTermination();
 

The above sample code works and it prints number of rows per topic per window.

What I have tried and has not worked:

// Issue with code
// kafkaDfStreamingWithWindow.df() gives access to entire df and not df for each window.
 
    Dataset<Row> kafkaDfStreaming = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "http://localhost:9092")
            .option("includeTimestamp", "true")
            .option("startingOffsets", "earliest")
            .option("endingOffset", "latest")
            .option("includeHeaders", "true")
            .option("subscribePattern", "HelloWorld.*");
 
    RelationalGroupedDataset kafkaDfStreamingWithWindow =
        kafkaDfStreaming.groupBy(
            functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
            functions.col("topic"));
 
    DataStreamWriter<Row> kafkaStreaming = kafkaDfStreamingWithWindow.df().writeStream();
 
    DataStreamWriter<Row> afterProcessingSparkStream =
        kafkaStreaming.foreachBatch(
            new VoidFunction2<Dataset<Row>, Long>() {
              @Override
              public void call(Dataset<Row> kafkaDf, Long batchId) throws Exception {
                     /// Processing code to materialize data to database. Its ok to overwrite data.
              }
            });
 
 
    StreamingQuery query =
        afterProcessingSparkStream.trigger(Trigger.ProcessingTime("600 seconds")).outputMode(OutputMode.Update()).start();
 
    query.awaitTermination();

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.