Databricks Autoloader Checkpoint

boitumelodikoko
Databricks Partner

Hello Databricks Community,

I'm encountering an issue with the Databricks Autoloader where, after running successfully for a period of time, it suddenly stops detecting new files in the source directory. This issue only gets resolved when I reset the checkpoint, which forces Autoloader to reprocess all files from scratch. This behavior is unexpected and has disrupted our data pipeline operations. I'm seeking help to understand and resolve this issue.

Environment Details:

  • Databricks Runtime Version: 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)
  • Cloud Platform: Azure

Autoloader Configuration:

  • File Format: JSON
  • Directory Structure: Files are placed in a flat directory structure in cloud storage (e.g., AWS S3, Azure Blob Storage).
  • File Arrival: Files are added incrementally and may sometimes be overwritten.

Code Setup:

sdf = (spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.allowOverwrites", 'true')
  .option("cloudFiles.inferColumnTypes", "true")
  .option("badRecordsPath", bad_records_path)
  .schema(schema)
  .load(loading_path))

(sdf.writeStream
  .format("delta")
  .outputMode("append")
  .option("mergeSchema", "true")
  .option("badRecordsPath", bad_records_path)
  .foreachBatch(upsert_to_delta)
  .option("checkpointLocation", checkpoint_path)
  .start(table_path))

Problem Description:

  • Issue: Autoloader stops detecting new files after it has been working successfully for some time.
  • Resolution Attempted: Resetting the checkpoint path resolves the issue temporarily, allowing Autoloader to detect and process new files again. However, this approach is not ideal as it forces reprocessing of all files, leading to potential duplication and increased processing time.
  • Expected Behavior: Autoloader should continuously detect and process new files as they are added to the loading_path, without needing to reset the checkpoint.

Steps Taken for Troubleshooting:

  1. Verified that the checkpoint location is consistent and has the correct permissions.
  2. Checked the naming pattern and directory structure of new files.
  3. Ensured no manual changes are made to the checkpoint directory.
  4. Monitored the logs for any specific error messages or warnings related to file detection.
  5. Reviewed and confirmed that there were no significant changes to the data source or the processing logic during the period when the issue began.

Additional Context:

  • Files are sometimes overwritten in the source directory, which is why cloudFiles.allowOverwrites is set to true.
  • Schema changes may occur, so mergeSchema and inferColumnTypes options are enabled to handle schema evolution.
  • Using a custom upsert_to_delta function for handling upserts into Delta tables.
  • The issue occurred unexpectedly, despite the Autoloader working without problems for a considerable amount of time.

Questions:

  1. What could be causing Autoloader to suddenly stop detecting new files after working fine for a while?
  2. Are there any specific best practices or configurations for managing checkpoints and file detection with cloudFiles.mode that might prevent this from happening?
  3. How can I avoid having to reset the checkpoint to detect new files?

Any insights, suggestions, or similar experiences would be greatly appreciated!

Thank you!


Thanks,
Boitumelo