02-03-2025 05:05 AM
Hi everyone,
I'm working with Databricks structured streaming and have encountered an issue with stateful operations. Below is my pseudo-code:
df = df.withWatermark("timestamp", "1 second")
df_header = df.withColumn("message_id", F.col("payload.id"))
df_values = df.withColumn("message_id", F.col("payload.id")) \
.withColumn("values_exploded", F.explode("payload.values")) \
.withColumn("name", F.col("values_exploded.name")) \
.groupBy(F.window(F.col("timestamp"), "10 seconds"), F.col("message_id"), F.col("name")) \
.agg(F.collect_list("values_exploded").alias("values_grouped"))
...
df_values_grouped = df_values.groupBy(F.window(df_values.window, "10 seconds"), F.col("message_id")) \
.agg(F.collect_list(F.struct("*")).alias("values"))
final_df = df_header.join(df_values_grouped, "message_id", "inner")
From my understanding, it should be possible to do multiple stateful operations in Spark/Databricks since the 13.1/3.5.0 release. However, I am getting the following error:
Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operations which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded.
Why am I getting this error, and how can I fix it?
02-03-2025 06:53 AM
Hello @fperry,
This error occurs because the query contains stateful operations that can emit rows older than the current watermark plus the allowed late record delay. These rows are considered "late rows" in downstream stateful operations and can be discarded. You might need to adjust the watermark duration or the allowed late record delay to accommodate the lateness of your data. This can help prevent the discarding of late rows.
If you understand the risks and still need to run the query, you can disable the correctness check by setting the following configuration:
spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")
However, this should be done with caution as it can lead to potential correctness issues in your streaming application
02-03-2025 07:02 AM
Hello @Alberto_Umana,
Thank you for your reply. Can you maybe explain how adjusting the watermark duration would fix this issue? I just tested it with a duration of 10 minutes and left everything else the same. However, I'm still facing the same error.
02-03-2025 08:32 AM
Hi @fperry,
Which DBR version are you using? Could you please try with Databricks Runtime 13.3 LTS?
02-03-2025 08:55 AM
Hi @Alberto_Umana,
I'm using DATABRICKS_RUNTIME_VERSION: 16.1
02-03-2025 12:20 PM
This should according to this blog post basically work, right? However, I'm getting the same error
Multiple Stateful Streaming Operators | Databricks Blog
Or am I missing something?
rate_df = spark.readStream.format("rate").option("rowsPerSecond", "1").load()
rate_df = rate_df.withWatermark("timestamp", "2 seconds")
# display(rate_df)
counts1 = rate_df.groupBy(F.window("timestamp", "10 seconds")).count()
counts2 = counts1.groupBy(F.window(F.window_time(counts1.window), "20 seconds")).count()
display(counts2)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group