<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How to push late data to DLQ in pyspark structured streaming ? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/116827#M45378</link>
    <description>&lt;P&gt;Windowing/Watermarking is your friend here &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 28 Apr 2025 17:52:54 GMT</pubDate>
    <dc:creator>Louis_Frolio</dc:creator>
    <dc:date>2025-04-28T17:52:54Z</dc:date>
    <item>
      <title>How to push late data to DLQ in pyspark structured streaming ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/116758#M45369</link>
      <description />
      <pubDate>Mon, 28 Apr 2025 12:13:12 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/116758#M45369</guid>
      <dc:creator>PreetiB</dc:creator>
      <dc:date>2025-04-28T12:13:12Z</dc:date>
    </item>
    <item>
      <title>Re: How to push late data to DLQ in pyspark structured streaming ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/116823#M45377</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/162330"&gt;@PreetiB&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;pushing late-arriving data into a Dead Letter Queue (DLQ) is very common in PySpark Structured Streaming, especially in real-time pipelines.&lt;/P&gt;&lt;P&gt;1. Set watermarking on your stream to define "late data."&lt;/P&gt;&lt;P&gt;Example:&lt;BR /&gt;df_with_event_time = df.withWatermark("event_time_column", "5 minutes")&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;2. Split the stream into:&lt;BR /&gt;On-time data&lt;BR /&gt;Late data&lt;BR /&gt;You can do this by comparing current event time vs. watermark (manually).&lt;BR /&gt;Example:&lt;BR /&gt;from pyspark.sql import functions as F&lt;/P&gt;&lt;P&gt;WATERMARK_DELAY_MINUTES = 5&lt;/P&gt;&lt;P&gt;current_time = F.current_timestamp()&lt;/P&gt;&lt;P&gt;# Create a flag to identify late data&lt;BR /&gt;df_with_flags = df_with_event_time.withColumn(&lt;BR /&gt;"is_late",&lt;BR /&gt;F.when(&lt;BR /&gt;F.col("event_time_column") &amp;lt; (current_time - F.expr(f"INTERVAL {WATERMARK_DELAY_MINUTES} minutes")),&lt;BR /&gt;F.lit(True)&lt;BR /&gt;).otherwise(F.lit(False))&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;3. Filter into two DataFrames:&lt;BR /&gt;# Good data&lt;BR /&gt;df_on_time = df_with_flags.filter(~F.col("is_late"))&lt;BR /&gt;# Late data to push to DLQ&lt;BR /&gt;df_late = df_with_flags.filter(F.col("is_late"))&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;4. Write late data to DLQ (example: to a separate Delta table, or a blob location):&lt;BR /&gt;Example:&lt;BR /&gt;dlq_path = "/mnt/dlq/late_data/"&lt;/P&gt;&lt;P&gt;late_query = (&lt;BR /&gt;df_late.writeStream&lt;BR /&gt;.format("delta") # or "parquet", "json"&lt;BR /&gt;.option("checkpointLocation", "/mnt/checkpoints/late_data/")&lt;BR /&gt;.outputMode("append")&lt;BR /&gt;.start(dlq_path)&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;&lt;span class="lia-unicode-emoji" title=":white_heavy_check_mark:"&gt;✅&lt;/span&gt;Now late data is saved safely for further analysis without poisoning your main pipeline.&lt;/P&gt;&lt;P&gt;5. Write good data normally to your trusted sink:&lt;BR /&gt;Example:&lt;BR /&gt;good_query = (&lt;BR /&gt;df_on_time.writeStream&lt;BR /&gt;.format("delta")&lt;BR /&gt;.option("checkpointLocation", "/mnt/checkpoints/good_data/")&lt;BR /&gt;.outputMode("append")&lt;BR /&gt;.start("/mnt/good_data/")&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;If you want to auto-drop too-late data silently, you can enable:&lt;BR /&gt;.option("dropLateData", "true")&lt;BR /&gt;(Available for certain streaming sources like Auto Loader or Kafka in Databricks.)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 28 Apr 2025 17:00:24 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/116823#M45377</guid>
      <dc:creator>lingareddy_Alva</dc:creator>
      <dc:date>2025-04-28T17:00:24Z</dc:date>
    </item>
    <item>
      <title>Re: How to push late data to DLQ in pyspark structured streaming ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/116827#M45378</link>
      <description>&lt;P&gt;Windowing/Watermarking is your friend here &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 28 Apr 2025 17:52:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/116827#M45378</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-04-28T17:52:54Z</dc:date>
    </item>
    <item>
      <title>Re: How to push late data to DLQ in pyspark structured streaming ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/117180#M45444</link>
      <description>&lt;P&gt;Filtering late data based on current timestamp is not acceptable as watermarking is based on max timestamp of events in batch - threshold for delay.&lt;/P&gt;&lt;P&gt;So , my objective is to get late data based on watermark value.&lt;/P&gt;&lt;P&gt;Currently I am struggling to get watermark value in application so that I can filter late data.&lt;/P&gt;&lt;P&gt;We can get watermark value from streaming querylistener object but we cannot do heavy processing like filtering in listener as it raises insufficient memory issue&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 30 Apr 2025 16:25:06 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/117180#M45444</guid>
      <dc:creator>PreetiB</dc:creator>
      <dc:date>2025-04-30T16:25:06Z</dc:date>
    </item>
    <item>
      <title>Re: How to push late data to DLQ in pyspark structured streaming ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/117182#M45446</link>
      <description>&lt;P&gt;Thanks for your response and I also want to add below problem that I m facing currently&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 30 Apr 2025 16:27:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/117182#M45446</guid>
      <dc:creator>PreetiB</dc:creator>
      <dc:date>2025-04-30T16:27:44Z</dc:date>
    </item>
    <item>
      <title>Re: How to push late data to DLQ in pyspark structured streaming ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/117212#M45454</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/162330"&gt;@PreetiB&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Watermarking in Spark Structured Streaming defines how late an event can arrive before it's considered "too late,"&lt;BR /&gt;but it doesn't directly expose the current watermark value to use within your main processing logic.&lt;BR /&gt;This makes it tricky when you want to filter late data based on the watermark value.&lt;/P&gt;&lt;P&gt;You're also correct that using StreamingQueryListener to capture the watermark value and then doing heavy processing in&lt;BR /&gt;it is problematic due to memory and architecture limitations (listeners are for monitoring, not processing).&lt;/P&gt;&lt;P&gt;Because watermarking is handled internally by Spark, there’s no clean public API to pull the watermark into transformations directly.&lt;BR /&gt;The workaround using a side channel (temp view, broadcast, external KV store) is the safest way to make that value accessible within your transformation logic.&lt;/P&gt;</description>
      <pubDate>Wed, 30 Apr 2025 20:36:06 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/117212#M45454</guid>
      <dc:creator>lingareddy_Alva</dc:creator>
      <dc:date>2025-04-30T20:36:06Z</dc:date>
    </item>
    <item>
      <title>Re: How to push late data to DLQ in pyspark structured streaming ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/117608#M45530</link>
      <description>&lt;P&gt;Hi LRALVA,&lt;/P&gt;&lt;P&gt;Can you please describe the workaround in detail so that I can implement the same&lt;/P&gt;</description>
      <pubDate>Sat, 03 May 2025 07:19:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-push-late-data-to-dlq-in-pyspark-structured-streaming/m-p/117608#M45530</guid>
      <dc:creator>PreetiB</dc:creator>
      <dc:date>2025-05-03T07:19:55Z</dc:date>
    </item>
  </channel>
</rss>

