cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to push late data to DLQ in pyspark structured streaming ?

PreetiB
New Contributor II
 
6 REPLIES 6

lingareddy_Alva
Honored Contributor II

Hi @PreetiB 

pushing late-arriving data into a Dead Letter Queue (DLQ) is very common in PySpark Structured Streaming, especially in real-time pipelines.

1. Set watermarking on your stream to define "late data."

Example:
df_with_event_time = df.withWatermark("event_time_column", "5 minutes")


2. Split the stream into:
On-time data
Late data
You can do this by comparing current event time vs. watermark (manually).
Example:
from pyspark.sql import functions as F

WATERMARK_DELAY_MINUTES = 5

current_time = F.current_timestamp()

# Create a flag to identify late data
df_with_flags = df_with_event_time.withColumn(
"is_late",
F.when(
F.col("event_time_column") < (current_time - F.expr(f"INTERVAL {WATERMARK_DELAY_MINUTES} minutes")),
F.lit(True)
).otherwise(F.lit(False))
)

 

3. Filter into two DataFrames:
# Good data
df_on_time = df_with_flags.filter(~F.col("is_late"))
# Late data to push to DLQ
df_late = df_with_flags.filter(F.col("is_late"))

 

4. Write late data to DLQ (example: to a separate Delta table, or a blob location):
Example:
dlq_path = "/mnt/dlq/late_data/"

late_query = (
df_late.writeStream
.format("delta") # or "parquet", "json"
.option("checkpointLocation", "/mnt/checkpoints/late_data/")
.outputMode("append")
.start(dlq_path)
)

โœ…Now late data is saved safely for further analysis without poisoning your main pipeline.

5. Write good data normally to your trusted sink:
Example:
good_query = (
df_on_time.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/good_data/")
.outputMode("append")
.start("/mnt/good_data/")
)

 

If you want to auto-drop too-late data silently, you can enable:
.option("dropLateData", "true")
(Available for certain streaming sources like Auto Loader or Kafka in Databricks.)

 

 

 

LR

Filtering late data based on current timestamp is not acceptable as watermarking is based on max timestamp of events in batch - threshold for delay.

So , my objective is to get late data based on watermark value.

Currently I am struggling to get watermark value in application so that I can filter late data.

We can get watermark value from streaming querylistener object but we cannot do heavy processing like filtering in listener as it raises insufficient memory issue 

lingareddy_Alva
Honored Contributor II

@PreetiB 

Watermarking in Spark Structured Streaming defines how late an event can arrive before it's considered "too late,"
but it doesn't directly expose the current watermark value to use within your main processing logic.
This makes it tricky when you want to filter late data based on the watermark value.

You're also correct that using StreamingQueryListener to capture the watermark value and then doing heavy processing in
it is problematic due to memory and architecture limitations (listeners are for monitoring, not processing).

Because watermarking is handled internally by Spark, thereโ€™s no clean public API to pull the watermark into transformations directly.
The workaround using a side channel (temp view, broadcast, external KV store) is the safest way to make that value accessible within your transformation logic.

LR

Hi LRALVA,

Can you please describe the workaround in detail so that I can implement the same

Thanks for your response and I also want to add below problem that I m facing currently 

BigRoux
Databricks Employee
Databricks Employee

Windowing/Watermarking is your friend here ๐Ÿ™‚

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now