cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Reset committed offset of spark streaming to capture missed data

DatabricksUser5
New Contributor II

I have a very straightforward setup between Azure Eventhub and DLT using the kafka endpoint through spark streaming.

There were network issues and the stream didn't pick up some event, but still progressed (and committed) the offset for some reason

As such, the DLT now picks up any new data coming into the eventhub, but not the events that arrived prior to the network issue being resolved

Is there a way to force reset the offset of the spark reader to always be earliest? At the moment, setting the offset desired does not work as there already is a committed offset to be used, but I want to override that

 

Alternative would be to create a new partition and move events that were not picked up there, or re-ingest the events that are prior to the committed offset, but that's really not elegant imo 

4 REPLIES 4

K_Anudeep
Databricks Employee
Databricks Employee

Hello @DatabricksUser5 ,

You canโ€™t override committed offsets in-place for a running DLT Kafka/Event Hubs stream. If a pipeline already has a checkpoint created, startingOffsets is ignored. To replay data, you must reset the streaming checkpoints or create a new checkpoint using FULL REFRESH in DLT, and the events must still be retained in Event Hubs.

For Kafka sources (including Event Hubs Kafka endpoint), Spark Structured Streaming:

  • Uses startingOffsets only when the streaming query is initially created.
  • After that, it always resumes from the offsets stored in the checkpoint directory, completely ignoring startingOffsets on restart.

You can refer to the KB for more details here: https://kb.databricks.com/streaming/offset-reprocessing-issues-in-streaming-queries-with-a-kafka-sou...

So, below are the mitigations for you scenario:

 

 

Anudeep

This unfortunately only work on pipeline that are not continuous, which mine is.

saurabh18cs
Honored Contributor II

Hi @DatabricksUser5 also look for eventhub retention policies for example if your eventhub retention is set to 7 days which means those events older than 7 days which you are trying to re-process are already doesn't exist in eventhub so no matter what you choose they are gone and you need to replay them afresh.

also setting to earliest or latest for kafka obeyed only for first run with clean checkpoints, after streeaming will always respect checkpoints. you wont be risking cleaning checkpoints else you may face duplicates in your data if only append operation.

Br

DatabricksUser5
New Contributor II

Thank you K_Anudeep! The REST API is exactly what I was looking for.