Hello everyone!
We are currently facing an issue with a stream that is not updating new data since the 20 of July.
We've validated and bronze table has data that silver doesn't have.
Also seeing the logs the silver stream is running but writing 0 files.
OffsetSeqLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO OffsetSeqLog: Getting latest offset 0
22/07/26 12:45:14 INFO CommitLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO CommitLog: Getting latest offset 0
22/07/26 12:45:14 INFO MicroBatchExecution: Query start: last started microbatch offset info = Some((0,[{"sourceVersion":1,"reservoirId":"271090ee-5d4b-4087-a6a0-5a9760d969d8","reservoirVersion":916406,"index":-1,"isStartingVersion":false}])), last successfully finished microbatch offset info = Some((0,CommitMetadata(0)))
22/07/26 12:45:14 INFO OffsetSeqLog: BatchIds found from listing: 0
22/07/26 12:45:14 INFO OffsetSeqLog: Getting latest offset 0
22/07/26 12:45:15 INFO CommitLog: BatchIds found from listing: 0
22/07/26 12:45:15 INFO CommitLog: Getting latest offset 0
Basically the job reads the bronze table, apply some transformations and write to our silver path.
bronze_df = spark \
.readStream \
.format("delta") \
.load(str(INPUT_PATH))
df = transform(bronze_df)
pc_df = df \
.writeStream \
.outputMode("append") \
.trigger(once=True) \
.format("delta") \
.option("checkpointLocation", CHECKPOINT_PATH) \
.partitionBy("event_date", "event_hour", "ad_type") \
.queryName(f"prod_silver_v2") \
.start(OUTPUT_PATH)
To add: Last week we reprocessed this table as we are repartitioning it. On our first run (8hs) the final step was to optimize the silver table and the job failed on that step. After that we saw that the table had the expected data, but after that run we couldn't update this table any more.
Could this be related?
Is there any way to "recover" the checkpoint to a previous state?
Thanks in advance!