I have timeseries data in k Kafka topics. I would like to read this data into windows of length 10 minutes. For each window, I want to run N SQL queries and materialize result. The specific N queries to run depends on the kafka topic name. How should I approach the above problem?
Questions:
-- Instead of using count as aggregation function in the first example code, is there a way to get access to Dataset<Row> for each window so that I can write my logic and materialize data.
-- There are different set of SQL queries configured for each kafka topic.
--
-- RelationalGroupedDataset kafkaDfStreamingWithWindow =
kafkaDfStreaming.groupBy(
functions.window(functions.col(KafkaDataset.KAFKA_DF_COLUMN_TIMESTAMP), "60 seconds", "60 seconds")).
KeyValueGroupedDataset.flatMapGroups function exists and its probably similar to what I am looking for, is there a way to convert RelationalGroupedDataset to KeyValueGroupedDataset. Any sample code for end to end implementation.
-- Am I thinking about the problem the right way? is there a better way to approach this problem?
Any sample code or direct would be helpful.
What I have tried (primitive code)
DataStreamReader kafkaDfStreaming=
sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "http://localhost:9092")
.option("includeTimestamp", "true")
.option("startingOffsets", "earliest")
.option("endingOffset", "latest")
.option("includeHeaders", "true")
.option("subscribePattern", "HelloWorld.*");
Dataset<Row> streamingWindow =
kafkaDfStreaming
.groupBy(
functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
functions.col("topic))
.count()
.select("window.start", "window.end", "topic", "count");
streamingWindow
.writeStream()
.format("console")
.trigger(Trigger.ProcessingTime("600 seconds"))
.outputMode(OutputMode.Update())
.start()
.awaitTermination();
The above sample code works and it prints number of rows per topic per window.
What I have tried and has not worked:
// Issue with code
// kafkaDfStreamingWithWindow.df() gives access to entire df and not df for each window.
Dataset<Row> kafkaDfStreaming = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "http://localhost:9092")
.option("includeTimestamp", "true")
.option("startingOffsets", "earliest")
.option("endingOffset", "latest")
.option("includeHeaders", "true")
.option("subscribePattern", "HelloWorld.*");
RelationalGroupedDataset kafkaDfStreamingWithWindow =
kafkaDfStreaming.groupBy(
functions.window(functions.col("timestamp"), "600 seconds", "600 seconds"),
functions.col("topic"));
DataStreamWriter<Row> kafkaStreaming = kafkaDfStreamingWithWindow.df().writeStream();
DataStreamWriter<Row> afterProcessingSparkStream =
kafkaStreaming.foreachBatch(
new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> kafkaDf, Long batchId) throws Exception {
/// Processing code to materialize data to database. Its ok to overwrite data.
}
});
StreamingQuery query =
afterProcessingSparkStream.trigger(Trigger.ProcessingTime("600 seconds")).outputMode(OutputMode.Update()).start();
query.awaitTermination();