Issue with Multiple Stateful Operations in Databricks Structured Streaming
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-03-2025 05:05 AM
Hi everyone,
I'm working with Databricks structured streaming and have encountered an issue with stateful operations. Below is my pseudo-code:
df = df.withWatermark("timestamp", "1 second")
df_header = df.withColumn("message_id", F.col("payload.id"))
df_values = df.withColumn("message_id", F.col("payload.id")) \
.withColumn("values_exploded", F.explode("payload.values")) \
.withColumn("name", F.col("values_exploded.name")) \
.groupBy(F.window(F.col("timestamp"), "10 seconds"), F.col("message_id"), F.col("name")) \
.agg(F.collect_list("values_exploded").alias("values_grouped"))
...
df_values_grouped = df_values.groupBy(F.window(df_values.window, "10 seconds"), F.col("message_id")) \
.agg(F.collect_list(F.struct("*")).alias("values"))
final_df = df_header.join(df_values_grouped, "message_id", "inner")
From my understanding, it should be possible to do multiple stateful operations in Spark/Databricks since the 13.1/3.5.0 release. However, I am getting the following error:
Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operations which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded.
Why am I getting this error, and how can I fix it?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-03-2025 06:53 AM
Hello @fperry,
This error occurs because the query contains stateful operations that can emit rows older than the current watermark plus the allowed late record delay. These rows are considered "late rows" in downstream stateful operations and can be discarded. You might need to adjust the watermark duration or the allowed late record delay to accommodate the lateness of your data. This can help prevent the discarding of late rows.
If you understand the risks and still need to run the query, you can disable the correctness check by setting the following configuration:
spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")
However, this should be done with caution as it can lead to potential correctness issues in your streaming application
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-03-2025 07:02 AM
Hello @Alberto_Umana,
Thank you for your reply. Can you maybe explain how adjusting the watermark duration would fix this issue? I just tested it with a duration of 10 minutes and left everything else the same. However, I'm still facing the same error.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-03-2025 08:32 AM
Hi @fperry,
Which DBR version are you using? Could you please try with Databricks Runtime 13.3 LTS?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-03-2025 08:55 AM
Hi @Alberto_Umana,
I'm using DATABRICKS_RUNTIME_VERSION: 16.1
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-03-2025 12:20 PM
This should according to this blog post basically work, right? However, I'm getting the same error
Multiple Stateful Streaming Operators | Databricks Blog
Or am I missing something?
rate_df = spark.readStream.format("rate").option("rowsPerSecond", "1").load()
rate_df = rate_df.withWatermark("timestamp", "2 seconds")
# display(rate_df)
counts1 = rate_df.groupBy(F.window("timestamp", "10 seconds")).count()
counts2 = counts1.groupBy(F.window(F.window_time(counts1.window), "20 seconds")).count()
display(counts2)

