โ04-28-2025 05:13 AM
โ04-28-2025 09:59 AM - edited โ04-28-2025 10:00 AM
Hi @PreetiB
pushing late-arriving data into a Dead Letter Queue (DLQ) is very common in PySpark Structured Streaming, especially in real-time pipelines.
1. Set watermarking on your stream to define "late data."
Example:
df_with_event_time = df.withWatermark("event_time_column", "5 minutes")
2. Split the stream into:
On-time data
Late data
You can do this by comparing current event time vs. watermark (manually).
Example:
from pyspark.sql import functions as F
WATERMARK_DELAY_MINUTES = 5
current_time = F.current_timestamp()
# Create a flag to identify late data
df_with_flags = df_with_event_time.withColumn(
"is_late",
F.when(
F.col("event_time_column") < (current_time - F.expr(f"INTERVAL {WATERMARK_DELAY_MINUTES} minutes")),
F.lit(True)
).otherwise(F.lit(False))
)
3. Filter into two DataFrames:
# Good data
df_on_time = df_with_flags.filter(~F.col("is_late"))
# Late data to push to DLQ
df_late = df_with_flags.filter(F.col("is_late"))
4. Write late data to DLQ (example: to a separate Delta table, or a blob location):
Example:
dlq_path = "/mnt/dlq/late_data/"
late_query = (
df_late.writeStream
.format("delta") # or "parquet", "json"
.option("checkpointLocation", "/mnt/checkpoints/late_data/")
.outputMode("append")
.start(dlq_path)
)
โ Now late data is saved safely for further analysis without poisoning your main pipeline.
5. Write good data normally to your trusted sink:
Example:
good_query = (
df_on_time.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/good_data/")
.outputMode("append")
.start("/mnt/good_data/")
)
If you want to auto-drop too-late data silently, you can enable:
.option("dropLateData", "true")
(Available for certain streaming sources like Auto Loader or Kafka in Databricks.)
โ04-30-2025 09:25 AM
Filtering late data based on current timestamp is not acceptable as watermarking is based on max timestamp of events in batch - threshold for delay.
So , my objective is to get late data based on watermark value.
Currently I am struggling to get watermark value in application so that I can filter late data.
We can get watermark value from streaming querylistener object but we cannot do heavy processing like filtering in listener as it raises insufficient memory issue
โ04-30-2025 01:36 PM
Watermarking in Spark Structured Streaming defines how late an event can arrive before it's considered "too late,"
but it doesn't directly expose the current watermark value to use within your main processing logic.
This makes it tricky when you want to filter late data based on the watermark value.
You're also correct that using StreamingQueryListener to capture the watermark value and then doing heavy processing in
it is problematic due to memory and architecture limitations (listeners are for monitoring, not processing).
Because watermarking is handled internally by Spark, thereโs no clean public API to pull the watermark into transformations directly.
The workaround using a side channel (temp view, broadcast, external KV store) is the safest way to make that value accessible within your transformation logic.
โ05-03-2025 12:19 AM
Hi LRALVA,
Can you please describe the workaround in detail so that I can implement the same
โ04-30-2025 09:27 AM
Thanks for your response and I also want to add below problem that I m facing currently
โ04-28-2025 10:52 AM
Windowing/Watermarking is your friend here ๐
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now