Spark streaming: Checkpoint not recognising new data
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-26-2022 06:10 AM
Hello everyone!
We are currently facing an issue with a stream that is not updating new data since the 20 of July.
We've validated and bronze table has data that silver doesn't have.
Also seeing the logs the silver stream is running but writing 0 files.
OffsetSeqLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO OffsetSeqLog: Getting latest offset 0
22/07/26 12:45:14 INFO CommitLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO CommitLog: Getting latest offset 0
22/07/26 12:45:14 INFO MicroBatchExecution: Query start: last started microbatch offset info = Some((0,[{"sourceVersion":1,"reservoirId":"271090ee-5d4b-4087-a6a0-5a9760d969d8","reservoirVersion":916406,"index":-1,"isStartingVersion":false}])), last successfully finished microbatch offset info = Some((0,CommitMetadata(0)))
22/07/26 12:45:14 INFO OffsetSeqLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO OffsetSeqLog: Getting latest offset 0
22/07/26 12:45:15 INFO CommitLog: BatchIds found from listing: 0
22/07/26 12:45:15 INFO CommitLog: Getting latest offset 0
Basically the job reads the bronze table, apply some transformations and write to our silver path.
bronze_df = spark \
.readStream \
.format("delta") \
.load(str(INPUT_PATH))
df = transform(bronze_df)
pc_df = df \
.writeStream \
.outputMode("append") \
.trigger(once=True) \
.format("delta") \
.option("checkpointLocation", CHECKPOINT_PATH) \
.partitionBy("event_date", "event_hour", "ad_type") \
.queryName(f"prod_silver_v2") \
.start(OUTPUT_PATH)
To add: Last week we reprocessed this table as we are repartitioning it. On our first run (8hs) the final step was to optimize the silver table and the job failed on that step. After that we saw that the table had the expected data, but after that run we couldn't update this table any more.
Could this be related?
Is there any way to "recover" the checkpoint to a previous state?
Thanks in advance!
- Labels:
-
Bronze Table
-
Data
-
New Data
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-26-2022 06:15 AM
Also the trigger is configured to run once, but when we start the job it never ends, it keeps in an endless loop.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-29-2022 04:40 PM
Did you delete the checkpoint by mistake? in case you did, then you can use "startingVersio" to define the offset version that you would like to start reading from. Here is more docs https://docs.databricks.com/delta/delta-streaming.html#specify-initial-position
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-15-2022 04:09 PM
Hi @Martin Riccardi,
Just a friendly follow-up. Did you see my previous response? did it help you? please let us know
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-10-2023 07:16 AM - edited 12-10-2023 07:17 AM
But how one can assign the startingVersion in production, it should pick the data from where the job got failed.
I am encountering a similar issue where the checkpoint location is consistently updated with new offset values every 15 minutes, yet the streaming data fails to load in delta table although I could see stream is happening. I have attempted using startingOffset=-1/@latest, but unfortunately, none of these approaches seem to resolve the issue