Tuesday
I'm using this code to read data from an ADLS Gen2 location. There are txt files present in sub-folders in the container.
df_stream = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "text") \
.option('cloudFiles.includeExistingFiles', "false") \
.option('cloudFiles.modifiedAfter', '2025-09-09 00:00:00.000000 UTC+0') \
.format("text") \
.load(LANDED_PATH)
All the old files in the location I need to skip as they have been processed previously. I followed the documentation and used includeExistingFiles and modifiedAfter option but these aren't working. The old files are still getting processed. Why are these options not working?
Tuesday
Hi @tabinashabir ,
I think one explanation why includeExistingFiles doesn't work is because this option is evaluated only when you start a stream for the first time. Changing this option after restarting the stream has no effect.
But it's weird why modifiedAfter doesn't work in you case. Maybe try to experiment with different timestamp string?
Something like this article from medium suggests?
yesterday
Hi @szymon_dybczak ,
Thanks for your reply.
I did try multiple date formats supported in Azure Databricks. I also tried using
.option('cloudFiles.includeExistingFiles', False). It still didn't work.
I cleared the checkpoints each time.
Tuesday
Root cause:
includeExistingFiles is only evaluated the first time the stream is started with a fresh checkpoint. If the stream is restarted or the checkpoint folder is reused, changing this option will have no effect on subsequent runs—old files previously seen or untracked can be reprocessed
To reliably skip existing files, you must use a new (clean) checkpoint location when starting the stream. Otherwise, Auto Loader will refer to the checkpoint's metadata of already-processed files and the original setting at the stream's initialization.
Solution:
1. Identify Your Container Path and Set a Clean Checkpoint Location
Determine the correct storage path (e.g., for ADLS Gen2: abfss://<container>@<account>.dfs.core.windows.net/<path>).
Choose a brand new checkpoint location that Auto Loader has never used. Example: abfss://<container>@<account>.dfs.core.windows.net/<path>/checkpoints/autoloader_run1.
Old checkpoint locations must not be reused; they store processed file metadata.
2. Set the Key Options in the Stream Reader
Use .option("cloudFiles.includeExistingFiles", "false") to exclude files present before the stream starts.
(Optional) Add .option("cloudFiles.modifiedAfter", "<timestamp>") if you know new files will have a modified time after the specified value.
Specify the correct cloudFiles.format (e.g., "text" for txt files).
3. Start the Stream (Python Example)
Replace <LANDED_PATH> and <CHECKPOINT_PATH> with your paths.
python
df_stream = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "text") \
.option("cloudFiles.includeExistingFiles", "false") \
.option("cloudFiles.modifiedAfter", "2025-09-09T00:00:00.000Z") \ # Optional for fine control
.load(LANDED_PATH)
query = df_stream.writeStream \
.format("delta") \
.option("checkpointLocation", CHECKPOINT_PATH) \
.start(OUTPUT_PATH)
Do not reuse any checkpoint directories from previous runs.
4. Validate File Processing
Upload a test file to your source folder. Only files that arrive after the streaming job starts will be ingested.
Old files will be skipped unless their lastModified attribute is after your specified timestamp in cloudFiles.modifiedAfter.
5. Forcing Exclusion of Old Files
If any old file does get processed, re-check your checkpoint is new and the file's lastModified attribute is correct.
If re-running or reconfiguring, always delete or provide a new checkpoint directory.
Let me know if it works
yesterday
Hi @ManojkMohan ,
Thanks for your reply.
The checkpoint location is set to a new path. I tried multiple approaches and cleared the checkpoint location each time so it's considered as the first run. I followed all the steps you've mentioned above. Still processing all old files.
The only difference I see is that I'm using trigger while doing the write stream.
df_stream.writeStream \
.format("delta") \
.trigger(availableNow=True) \
.option("checkpointLocation", CHECKPOINT_PATH) \
.start(OUTPUT_PATH)
I did try multiple date formats supported in Azure Databricks. I also tried using
.option('cloudFiles.includeExistingFiles', False). It still didn't work.
yesterday
Always use a brand new, clean checkpoint location when starting your stream to skip existing files.
Example checkpoint path: abfss://<container>@<account>.dfs.core.windows.net/<path>/checkpoints/autoloader_run1
Avoid .trigger(availableNow=True) if you want incremental processing.
Prefer default micro-batch triggers which process new files incrementally, respecting includeExistingFiles=false and checkpoint metadata.
Validate and format timestamps correctly in cloudFiles.modifiedAfter.
Use ISO8601 format with timezone info, e.g., 2025-09-09T00:00:00.000Z.
Ensure that the files to be skipped have correct last modified timestamps matching your filter criteria.
Verify source folder contents.
Remove or archive old files from the source directory if possible.
Use directory listing or file notifications properly configured for your cloud storage source.
Ensure stream is configured with these options:
python
df_stream = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "text") \
.option("cloudFiles.includeExistingFiles", "false") \
.option("cloudFiles.modifiedAfter", "2025-09-09T00:00:00.000Z") \ # proper ISO8601 format
.load(LANDED_PATH)
query = df_stream.writeStream \
.format("delta") \
.option("checkpointLocation", NEW_CHECKPOINT_PATH) \ # brand new checkpoint path
.start(OUTPUT_PATH)
If old files persist in processing, delete and recreate the checkpoint folder or use a new unique checkpoint location each run.
Use Auto Loader metrics and logs to monitor which files get processed to identify unexpected behaviors.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now