04-01-2023 04:31 AM
The problem is very simple, when you use TUMBLING window with append mode, then the window is closed only when the next message arrives (+watermark logic).
In the current implementation, if you stop incoming streaming data, the last window will NEVER close and we LOSE the last window data.
How can we force the last window to close\flush if new data stops incoming?
Business situation:
Worked correctly and new messages stop incoming and next message come in 5 hours later and the client will get the message after 5 hours instead of the 10 seconds delay of window.
Spark v3.3.2 Code of problem:
kafka_stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BROKER) \
.option("subscribe", KAFKA_TOPIC) \
.option("includeHeaders", "true") \
.load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json(col("value").cast("string"), json_schema).alias("data"))
.select("data.*")
.withWatermark("dt", "1 seconds")
.groupBy(window("dt", "10 seconds"))
.agg(sum("price"))
)
console = sel \
.writeStream \
.trigger(processingTime='10 seconds') \
.format("console") \
.outputMode("append")\
.start()
04-04-2023 06:42 PM
@Dev Podavan :
The issue you're facing is related to the behavior of Apache Spark's window operations in a streaming context when using a tumbling window with an append output mode. By default, the window will not close or flush until a new message arrives within the window's duration, which can result in data being delayed or lost if there is a gap in incoming data.
To force the window to close or flush even if new data stops incoming, you can set a watermark on the window operation with a timeout value. The watermark specifies a threshold time after which the window is considered complete, even if no new data arrives.
In your code, you have already defined a watermark on the "dt" column with a timeout of 1 second using the withWatermark function. However, you may need to adjust the watermark timeout value to a larger value that allows for any potential delays in the data arrival. For example, you can try increasing the watermark timeout to 5 minutes or longer, depending on the expected maximum delay in your data.
Here's an updated code snippet with a watermark timeout of 5 minutes:
kafka_stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BROKER) \
.option("subscribe", KAFKA_TOPIC) \
.option("includeHeaders", "true") \
.load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json(col("value").cast("string"), json_schema).alias("data"))
.select("data.*")
.withWatermark("dt", "5 minutes") # Increase watermark timeout to 5 minutes
.groupBy(window("dt", "10 seconds"))
.agg(sum("price"))
)
console = sel \
.writeStream \
.trigger(processingTime='10 seconds') \
.format("console") \
.outputMode("append")\
.start()
With this updated code, even if new data stops incoming, the window will close or flush after the watermark timeout of 5 minutes, ensuring that data is not delayed indefinitely or lost. You can adjust the watermark timeout value as needed based on your specific use case and data characteristics.
04-04-2023 11:44 PM
Hi @Dev Podavan
Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!
04-13-2023 03:37 AM
No, the problem remains the same. The meaning doesn't change because you increased the timeout a little bit. As the window did not close, and does not close until a new message arrives
12-05-2024 01:00 AM
Do you have any solution for this ?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group