cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Issue with Multiple Stateful Operations in Databricks Structured Streaming

fperry
New Contributor III

Hi everyone,

I'm working with Databricks structured streaming and have encountered an issue with stateful operations. Below is my pseudo-code:

 

df = df.withWatermark("timestamp", "1 second")

df_header = df.withColumn("message_id", F.col("payload.id"))

df_values = df.withColumn("message_id", F.col("payload.id")) \
              .withColumn("values_exploded", F.explode("payload.values")) \
              .withColumn("name", F.col("values_exploded.name")) \
              .groupBy(F.window(F.col("timestamp"), "10 seconds"), F.col("message_id"), F.col("name")) \
              .agg(F.collect_list("values_exploded").alias("values_grouped"))

...

df_values_grouped = df_values.groupBy(F.window(df_values.window, "10 seconds"), F.col("message_id")) \
                             .agg(F.collect_list(F.struct("*")).alias("values"))

final_df = df_header.join(df_values_grouped, "message_id", "inner")

 

From my understanding, it should be possible to do multiple stateful operations in Spark/Databricks since the 13.1/3.5.0 release. However, I am getting the following error:

Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operations which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded.

Why am I getting this error, and how can I fix it?

5 REPLIES 5

Alberto_Umana
Databricks Employee
Databricks Employee

Hello @fperry,

This error occurs because the query contains stateful operations that can emit rows older than the current watermark plus the allowed late record delay. These rows are considered "late rows" in downstream stateful operations and can be discarded. You might need to adjust the watermark duration or the allowed late record delay to accommodate the lateness of your data. This can help prevent the discarding of late rows.

If you understand the risks and still need to run the query, you can disable the correctness check by setting the following configuration:

 

spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")

However, this should be done with caution as it can lead to potential correctness issues in your streaming application

fperry
New Contributor III

Hello @Alberto_Umana,

Thank you for your reply. Can you maybe explain how adjusting the watermark duration would fix this issue? I just tested it with a duration of 10 minutes and left everything else the same. However, I'm still facing the same error.

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @fperry,

Which DBR version are you using? Could you please try with Databricks Runtime 13.3 LTS?

fperry
New Contributor III

Hi @Alberto_Umana,

I'm using DATABRICKS_RUNTIME_VERSION: 16.1

fperry
New Contributor III

This should according to this blog post basically work, right? However, I'm getting the same error
Multiple Stateful Streaming Operators | Databricks Blog

Or am I missing something?

 

rate_df = spark.readStream.format("rate").option("rowsPerSecond", "1").load()

rate_df = rate_df.withWatermark("timestamp", "2 seconds")

# display(rate_df)

counts1 = rate_df.groupBy(F.window("timestamp", "10 seconds")).count()

counts2 = counts1.groupBy(F.window(F.window_time(counts1.window), "20 seconds")).count()

display(counts2)