@Dev Podavan :
The issue you're facing is related to the behavior of Apache Spark's window operations in a streaming context when using a tumbling window with an append output mode. By default, the window will not close or flush until a new message arrives within the window's duration, which can result in data being delayed or lost if there is a gap in incoming data.
To force the window to close or flush even if new data stops incoming, you can set a watermark on the window operation with a timeout value. The watermark specifies a threshold time after which the window is considered complete, even if no new data arrives.
In your code, you have already defined a watermark on the "dt" column with a timeout of 1 second using the withWatermark function. However, you may need to adjust the watermark timeout value to a larger value that allows for any potential delays in the data arrival. For example, you can try increasing the watermark timeout to 5 minutes or longer, depending on the expected maximum delay in your data.
Here's an updated code snippet with a watermark timeout of 5 minutes:
kafka_stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BROKER) \
.option("subscribe", KAFKA_TOPIC) \
.option("includeHeaders", "true") \
.load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json(col("value").cast("string"), json_schema).alias("data"))
.select("data.*")
.withWatermark("dt", "5 minutes") # Increase watermark timeout to 5 minutes
.groupBy(window("dt", "10 seconds"))
.agg(sum("price"))
)
console = sel \
.writeStream \
.trigger(processingTime='10 seconds') \
.format("console") \
.outputMode("append")\
.start()
With this updated code, even if new data stops incoming, the window will close or flush after the watermark timeout of 5 minutes, ensuring that data is not delayed indefinitely or lost. You can adjust the watermark timeout value as needed based on your specific use case and data characteristics.