09-01-2024 11:56 PM - edited 09-01-2024 11:58 PM
Hello Databricks Community,
I'm encountering an issue with the Databricks Autoloader where, after running successfully for a period of time, it suddenly stops detecting new files in the source directory. This issue only gets resolved when I reset the checkpoint, which forces Autoloader to reprocess all files from scratch. This behavior is unexpected and has disrupted our data pipeline operations. I'm seeking help to understand and resolve this issue.
Environment Details:
Autoloader Configuration:
Code Setup:
sdf = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.allowOverwrites", 'true')
.option("cloudFiles.inferColumnTypes", "true")
.option("badRecordsPath", bad_records_path)
.schema(schema)
.load(loading_path))
(sdf.writeStream
.format("delta")
.outputMode("append")
.option("mergeSchema", "true")
.option("badRecordsPath", bad_records_path)
.foreachBatch(upsert_to_delta)
.option("checkpointLocation", checkpoint_path)
.start(table_path))
Problem Description:
Steps Taken for Troubleshooting:
Additional Context:
Questions:
Any insights, suggestions, or similar experiences would be greatly appreciated!
Thank you!
09-02-2024 04:22 AM - edited 09-02-2024 04:23 AM
@dikokob That's a weird issue however, there are two things that I would check in the first place:
- cloudFiles.maxFileAge, if set to None, that's fine. If it's other value - that could cause an issue (https://docs.databricks.com/en/ingestion/cloud-object-storage/auto-loader/production.html#max-file-a...)
- cloudFiles.backfillInterval - it's worth trying setting that to at least once a week (https://docs.databricks.com/en/ingestion/cloud-object-storage/auto-loader/production.html#trigger-re...)
I would also check the bad_records_path directory - maybe somehow files and up in there due to schema inference.
09-02-2024 05:12 AM
Hi @daniel_sahal ,
Thanks for the response:
09-02-2024 06:10 AM
@dikokob Yes, setting backfillinterval should be still worth it even without file notification mode.
The schema inference issue could be handled differently when running full reload vs. incremental load. Imagine a situation with JSON files, when you've got a complex data type that is constantly changing - during the initial load, it would take a sample of 1k files and merge their schema. When doing incremental load it could end up a little bit differently.
One thing that came into my mind is lack of .awaitTermination() at the end of the write. This could cause a failure to not be visible thus you might think that your code completed without failures.
https://api-docs.databricks.com/python/pyspark/latest/pyspark.ss/api/pyspark.sql.streaming.Streaming...
09-02-2024 06:55 AM
@daniel_sahal,
Any suggestion on a backfillinterval, that could work best for my use case?
I will also look into the .awaitTermination() and test that out.
I am mainly looking for a solution that will increase our ingestion's robustness.
Monday
Have you found something?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group