cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark streaming: Checkpoint not recognising new data

mriccardi
New Contributor II

Hello everyone!

We are currently facing an issue with a stream that is not updating new data since the 20 of July.

We've validated and bronze table has data that silver doesn't have.

Also seeing the logs the silver stream is running but writing 0 files.

OffsetSeqLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO OffsetSeqLog: Getting latest offset 0
22/07/26 12:45:14 INFO CommitLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO CommitLog: Getting latest offset 0
22/07/26 12:45:14 INFO MicroBatchExecution: Query start: last started microbatch offset info = Some((0,[{"sourceVersion":1,"reservoirId":"271090ee-5d4b-4087-a6a0-5a9760d969d8","reservoirVersion":916406,"index":-1,"isStartingVersion":false}])), last successfully finished microbatch offset info = Some((0,CommitMetadata(0)))
22/07/26 12:45:14 INFO OffsetSeqLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO OffsetSeqLog: Getting latest offset 0
22/07/26 12:45:15 INFO CommitLog: BatchIds found from listing: 0
22/07/26 12:45:15 INFO CommitLog: Getting latest offset 0

Basically the job reads the bronze table, apply some transformations and write to our silver path.

bronze_df = spark \
        .readStream \
        .format("delta") \
        .load(str(INPUT_PATH))
 
df = transform(bronze_df)
 
 pc_df = df \
        .writeStream \
        .outputMode("append") \
        .trigger(once=True) \
        .format("delta") \
        .option("checkpointLocation", CHECKPOINT_PATH) \
        .partitionBy("event_date", "event_hour", "ad_type") \
        .queryName(f"prod_silver_v2") \
        .start(OUTPUT_PATH)

To add: Last week we reprocessed this table as we are repartitioning it. On our first run (8hs) the final step was to optimize the silver table and the job failed on that step. After that we saw that the table had the expected data, but after that run we couldn't update this table any more.

Could this be related?

Is there any way to "recover" the checkpoint to a previous state?

Thanks in advance!

4 REPLIES 4

mriccardi
New Contributor II

Also the trigger is configured to run once, but when we start the job it never ends, it keeps in an endless loop.

Did you delete the checkpoint by mistake? in case you did, then you can use "startingVersio" to define the offset version that you would like to start reading from. Here is more docs https://docs.databricks.com/delta/delta-streaming.html#specify-initial-position

Hi @Martin Riccardi​,

Just a friendly follow-up. Did you see my previous response? did it help you? please let us know

But how one can assign the startingVersion in production, it should pick the data from where the job got failed.

I am encountering a similar issue where the checkpoint location is consistently updated with new offset values every 15 minutes, yet the streaming data fails to load in delta table although I could see stream is happening. I have attempted using startingOffset=-1/@latest, but unfortunately, none of these approaches seem to resolve the issue

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.