- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-28-2025 09:59 AM - edited 04-28-2025 10:00 AM
Hi @PreetiB
pushing late-arriving data into a Dead Letter Queue (DLQ) is very common in PySpark Structured Streaming, especially in real-time pipelines.
1. Set watermarking on your stream to define "late data."
Example:
df_with_event_time = df.withWatermark("event_time_column", "5 minutes")
2. Split the stream into:
On-time data
Late data
You can do this by comparing current event time vs. watermark (manually).
Example:
from pyspark.sql import functions as F
WATERMARK_DELAY_MINUTES = 5
current_time = F.current_timestamp()
# Create a flag to identify late data
df_with_flags = df_with_event_time.withColumn(
"is_late",
F.when(
F.col("event_time_column") < (current_time - F.expr(f"INTERVAL {WATERMARK_DELAY_MINUTES} minutes")),
F.lit(True)
).otherwise(F.lit(False))
)
3. Filter into two DataFrames:
# Good data
df_on_time = df_with_flags.filter(~F.col("is_late"))
# Late data to push to DLQ
df_late = df_with_flags.filter(F.col("is_late"))
4. Write late data to DLQ (example: to a separate Delta table, or a blob location):
Example:
dlq_path = "/mnt/dlq/late_data/"
late_query = (
df_late.writeStream
.format("delta") # or "parquet", "json"
.option("checkpointLocation", "/mnt/checkpoints/late_data/")
.outputMode("append")
.start(dlq_path)
)
✅Now late data is saved safely for further analysis without poisoning your main pipeline.
5. Write good data normally to your trusted sink:
Example:
good_query = (
df_on_time.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/good_data/")
.outputMode("append")
.start("/mnt/good_data/")
)
If you want to auto-drop too-late data silently, you can enable:
.option("dropLateData", "true")
(Available for certain streaming sources like Auto Loader or Kafka in Databricks.)