Issue with Multiple Stateful Operations in Databricks Structured Streaming

fperry
New Contributor III

Hi everyone,

I'm working with Databricks structured streaming and have encountered an issue with stateful operations. Below is my pseudo-code:

 

df = df.withWatermark("timestamp", "1 second")

df_header = df.withColumn("message_id", F.col("payload.id"))

df_values = df.withColumn("message_id", F.col("payload.id")) \
              .withColumn("values_exploded", F.explode("payload.values")) \
              .withColumn("name", F.col("values_exploded.name")) \
              .groupBy(F.window(F.col("timestamp"), "10 seconds"), F.col("message_id"), F.col("name")) \
              .agg(F.collect_list("values_exploded").alias("values_grouped"))

...

df_values_grouped = df_values.groupBy(F.window(df_values.window, "10 seconds"), F.col("message_id")) \
                             .agg(F.collect_list(F.struct("*")).alias("values"))

final_df = df_header.join(df_values_grouped, "message_id", "inner")

 

From my understanding, it should be possible to do multiple stateful operations in Spark/Databricks since the 13.1/3.5.0 release. However, I am getting the following error:

Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operations which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded.

Why am I getting this error, and how can I fix it?