lingareddy_Alva
Esteemed Contributor

Hi @PreetiB 

pushing late-arriving data into a Dead Letter Queue (DLQ) is very common in PySpark Structured Streaming, especially in real-time pipelines.

1. Set watermarking on your stream to define "late data."

Example:
df_with_event_time = df.withWatermark("event_time_column", "5 minutes")


2. Split the stream into:
On-time data
Late data
You can do this by comparing current event time vs. watermark (manually).
Example:
from pyspark.sql import functions as F

WATERMARK_DELAY_MINUTES = 5

current_time = F.current_timestamp()

# Create a flag to identify late data
df_with_flags = df_with_event_time.withColumn(
"is_late",
F.when(
F.col("event_time_column") < (current_time - F.expr(f"INTERVAL {WATERMARK_DELAY_MINUTES} minutes")),
F.lit(True)
).otherwise(F.lit(False))
)

 

3. Filter into two DataFrames:
# Good data
df_on_time = df_with_flags.filter(~F.col("is_late"))
# Late data to push to DLQ
df_late = df_with_flags.filter(F.col("is_late"))

 

4. Write late data to DLQ (example: to a separate Delta table, or a blob location):
Example:
dlq_path = "/mnt/dlq/late_data/"

late_query = (
df_late.writeStream
.format("delta") # or "parquet", "json"
.option("checkpointLocation", "/mnt/checkpoints/late_data/")
.outputMode("append")
.start(dlq_path)
)

Now late data is saved safely for further analysis without poisoning your main pipeline.

5. Write good data normally to your trusted sink:
Example:
good_query = (
df_on_time.writeStream
.format("delta")
.option("checkpointLocation", "/mnt/checkpoints/good_data/")
.outputMode("append")
.start("/mnt/good_data/")
)

 

If you want to auto-drop too-late data silently, you can enable:
.option("dropLateData", "true")
(Available for certain streaming sources like Auto Loader or Kafka in Databricks.)

 

 

 

LR