<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Spark Structured Streaming: How to run N queries on each window in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-how-to-run-n-queries-on-each-window/m-p/17475#M11496</link>
    <description>&lt;P&gt;I have timeseries data in k Kafka topics. I would like to read this data into windows of length 10 minutes. For each window, I want to run N SQL queries and materialize result. The specific N queries to run depends on the kafka topic name. How should I approach the above problem?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Questions:&lt;/P&gt;&lt;P&gt;-- Instead of using count as aggregation function in the first example code, is there a way to get access to Dataset&amp;lt;Row&amp;gt; for each window so that I can write my logic and materialize data.&lt;/P&gt;&lt;P&gt;-- There are different set of SQL queries configured for each kafka topic.&lt;/P&gt;&lt;P&gt;--&lt;/P&gt;&lt;P&gt;-- RelationalGroupedDataset kafkaDfStreamingWithWindow =&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;kafkaDfStreaming.groupBy(&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;functions.window(functions.col(KafkaDataset.KAFKA_DF_COLUMN_TIMESTAMP), "60 seconds", "60 seconds")).&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;KeyValueGroupedDataset.flatMapGroups function exists and its probably similar to what I am looking for, is there a way to convert RelationalGroupedDataset  to KeyValueGroupedDataset. Any sample code for end to end implementation.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;-- Am I thinking about the problem the right way? is there a better way to approach this problem?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Any sample code or direct would be helpful.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;What I have tried (primitive code)&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;DataStreamReader kafkaDfStreaming=
        sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "http://localhost:9092")
            .option("includeTimestamp", "true")
            .option("startingOffsets", "earliest")
            .option("endingOffset", "latest")
            .option("includeHeaders", "true")
            .option("subscribePattern", "HelloWorld.*");
&amp;nbsp;
Dataset&amp;lt;Row&amp;gt; streamingWindow =
        kafkaDfStreaming
            .groupBy(
                functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
                functions.col("topic))
            .count()
            .select("window.start", "window.end", "topic", "count");
&amp;nbsp;
    streamingWindow
        .writeStream()
        .format("console")
        .trigger(Trigger.ProcessingTime("600 seconds"))
        .outputMode(OutputMode.Update())
        .start()
        .awaitTermination();
&amp;nbsp;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;The above sample code works and it prints number of rows per topic per window.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;What I have tried and has not worked:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;// Issue with code
// kafkaDfStreamingWithWindow.df() gives access to entire df and not df for each window.
&amp;nbsp;
    Dataset&amp;lt;Row&amp;gt; kafkaDfStreaming = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "http://localhost:9092")
            .option("includeTimestamp", "true")
            .option("startingOffsets", "earliest")
            .option("endingOffset", "latest")
            .option("includeHeaders", "true")
            .option("subscribePattern", "HelloWorld.*");
&amp;nbsp;
    RelationalGroupedDataset kafkaDfStreamingWithWindow =
        kafkaDfStreaming.groupBy(
            functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
            functions.col("topic"));
&amp;nbsp;
    DataStreamWriter&amp;lt;Row&amp;gt; kafkaStreaming = kafkaDfStreamingWithWindow.df().writeStream();
&amp;nbsp;
    DataStreamWriter&amp;lt;Row&amp;gt; afterProcessingSparkStream =
        kafkaStreaming.foreachBatch(
            new VoidFunction2&amp;lt;Dataset&amp;lt;Row&amp;gt;, Long&amp;gt;() {
              @Override
              public void call(Dataset&amp;lt;Row&amp;gt; kafkaDf, Long batchId) throws Exception {
                     /// Processing code to materialize data to database. Its ok to overwrite data.
              }
            });
&amp;nbsp;
&amp;nbsp;
    StreamingQuery query =
        afterProcessingSparkStream.trigger(Trigger.ProcessingTime("600 seconds")).outputMode(OutputMode.Update()).start();
&amp;nbsp;
    query.awaitTermination();&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 16 Jun 2022 19:11:36 GMT</pubDate>
    <dc:creator>VivekBhalgat</dc:creator>
    <dc:date>2022-06-16T19:11:36Z</dc:date>
    <item>
      <title>Spark Structured Streaming: How to run N queries on each window</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-how-to-run-n-queries-on-each-window/m-p/17475#M11496</link>
      <description>&lt;P&gt;I have timeseries data in k Kafka topics. I would like to read this data into windows of length 10 minutes. For each window, I want to run N SQL queries and materialize result. The specific N queries to run depends on the kafka topic name. How should I approach the above problem?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Questions:&lt;/P&gt;&lt;P&gt;-- Instead of using count as aggregation function in the first example code, is there a way to get access to Dataset&amp;lt;Row&amp;gt; for each window so that I can write my logic and materialize data.&lt;/P&gt;&lt;P&gt;-- There are different set of SQL queries configured for each kafka topic.&lt;/P&gt;&lt;P&gt;--&lt;/P&gt;&lt;P&gt;-- RelationalGroupedDataset kafkaDfStreamingWithWindow =&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;kafkaDfStreaming.groupBy(&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;functions.window(functions.col(KafkaDataset.KAFKA_DF_COLUMN_TIMESTAMP), "60 seconds", "60 seconds")).&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;KeyValueGroupedDataset.flatMapGroups function exists and its probably similar to what I am looking for, is there a way to convert RelationalGroupedDataset  to KeyValueGroupedDataset. Any sample code for end to end implementation.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;-- Am I thinking about the problem the right way? is there a better way to approach this problem?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Any sample code or direct would be helpful.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;What I have tried (primitive code)&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;DataStreamReader kafkaDfStreaming=
        sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "http://localhost:9092")
            .option("includeTimestamp", "true")
            .option("startingOffsets", "earliest")
            .option("endingOffset", "latest")
            .option("includeHeaders", "true")
            .option("subscribePattern", "HelloWorld.*");
&amp;nbsp;
Dataset&amp;lt;Row&amp;gt; streamingWindow =
        kafkaDfStreaming
            .groupBy(
                functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
                functions.col("topic))
            .count()
            .select("window.start", "window.end", "topic", "count");
&amp;nbsp;
    streamingWindow
        .writeStream()
        .format("console")
        .trigger(Trigger.ProcessingTime("600 seconds"))
        .outputMode(OutputMode.Update())
        .start()
        .awaitTermination();
&amp;nbsp;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;The above sample code works and it prints number of rows per topic per window.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;What I have tried and has not worked:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;// Issue with code
// kafkaDfStreamingWithWindow.df() gives access to entire df and not df for each window.
&amp;nbsp;
    Dataset&amp;lt;Row&amp;gt; kafkaDfStreaming = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "http://localhost:9092")
            .option("includeTimestamp", "true")
            .option("startingOffsets", "earliest")
            .option("endingOffset", "latest")
            .option("includeHeaders", "true")
            .option("subscribePattern", "HelloWorld.*");
&amp;nbsp;
    RelationalGroupedDataset kafkaDfStreamingWithWindow =
        kafkaDfStreaming.groupBy(
            functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
            functions.col("topic"));
&amp;nbsp;
    DataStreamWriter&amp;lt;Row&amp;gt; kafkaStreaming = kafkaDfStreamingWithWindow.df().writeStream();
&amp;nbsp;
    DataStreamWriter&amp;lt;Row&amp;gt; afterProcessingSparkStream =
        kafkaStreaming.foreachBatch(
            new VoidFunction2&amp;lt;Dataset&amp;lt;Row&amp;gt;, Long&amp;gt;() {
              @Override
              public void call(Dataset&amp;lt;Row&amp;gt; kafkaDf, Long batchId) throws Exception {
                     /// Processing code to materialize data to database. Its ok to overwrite data.
              }
            });
&amp;nbsp;
&amp;nbsp;
    StreamingQuery query =
        afterProcessingSparkStream.trigger(Trigger.ProcessingTime("600 seconds")).outputMode(OutputMode.Update()).start();
&amp;nbsp;
    query.awaitTermination();&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 16 Jun 2022 19:11:36 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-how-to-run-n-queries-on-each-window/m-p/17475#M11496</guid>
      <dc:creator>VivekBhalgat</dc:creator>
      <dc:date>2022-06-16T19:11:36Z</dc:date>
    </item>
  </channel>
</rss>

