cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark streaming: Checkpoint not recognising new data

mriccardi
New Contributor II

Hello everyone!

We are currently facing an issue with a stream that is not updating new data since the 20 of July.

We've validated and bronze table has data that silver doesn't have.

Also seeing the logs the silver stream is running but writing 0 files.

OffsetSeqLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO OffsetSeqLog: Getting latest offset 0
22/07/26 12:45:14 INFO CommitLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO CommitLog: Getting latest offset 0
22/07/26 12:45:14 INFO MicroBatchExecution: Query start: last started microbatch offset info = Some((0,[{"sourceVersion":1,"reservoirId":"271090ee-5d4b-4087-a6a0-5a9760d969d8","reservoirVersion":916406,"index":-1,"isStartingVersion":false}])), last successfully finished microbatch offset info = Some((0,CommitMetadata(0)))
22/07/26 12:45:14 INFO OffsetSeqLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO OffsetSeqLog: Getting latest offset 0
22/07/26 12:45:15 INFO CommitLog: BatchIds found from listing: 0
22/07/26 12:45:15 INFO CommitLog: Getting latest offset 0

Basically the job reads the bronze table, apply some transformations and write to our silver path.

bronze_df = spark \
        .readStream \
        .format("delta") \
        .load(str(INPUT_PATH))
 
df = transform(bronze_df)
 
 pc_df = df \
        .writeStream \
        .outputMode("append") \
        .trigger(once=True) \
        .format("delta") \
        .option("checkpointLocation", CHECKPOINT_PATH) \
        .partitionBy("event_date", "event_hour", "ad_type") \
        .queryName(f"prod_silver_v2") \
        .start(OUTPUT_PATH)

To add: Last week we reprocessed this table as we are repartitioning it. On our first run (8hs) the final step was to optimize the silver table and the job failed on that step. After that we saw that the table had the expected data, but after that run we couldn't update this table any more.

Could this be related?

Is there any way to "recover" the checkpoint to a previous state?

Thanks in advance!

4 REPLIES 4

mriccardi
New Contributor II

Also the trigger is configured to run once, but when we start the job it never ends, it keeps in an endless loop.

Did you delete the checkpoint by mistake? in case you did, then you can use "startingVersio" to define the offset version that you would like to start reading from. Here is more docs https://docs.databricks.com/delta/delta-streaming.html#specify-initial-position

Hi @Martin Riccardi​,

Just a friendly follow-up. Did you see my previous response? did it help you? please let us know

But how one can assign the startingVersion in production, it should pick the data from where the job got failed.

I am encountering a similar issue where the checkpoint location is consistently updated with new offset values every 15 minutes, yet the streaming data fails to load in delta table although I could see stream is happening. I have attempted using startingOffset=-1/@latest, but unfortunately, none of these approaches seem to resolve the issue

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!