09-11-2024 12:31 PM - edited 09-11-2024 12:33 PM
Hello, I am working on a dlt pipeline and I've been facing an issue. I hope someone here can help me find a solution.
My files are json in azure storage. These files are stored in dircctory like this ( blobName/FolderName/xx.csv).
The folder name is like this ( 2024-08-20T12.00.00Z). We get these files all day and the name changes. The problem I have is that when I run the pipeline I should exclude today's files .
Here's part of my script
.load(blobName"/"+FolderName+"/*.json"), so if I run it today, I will do .load(blobName"/"+"2024-09-11*Z"+"/*.json"). This will work fine and pick up all files from 2024-09-11. The problem is the following day. When I change it to .load(blobName"/"+"2024-09-12*Z"+"/*.csv"), it doesn't pick up all the files. I think it only picks up the new files that came after yesterday's run time. Is there anything I am doing wrong with those "*" wild card in my path? I appreciate any help.
09-11-2024 12:44 PM
Hi @standup1 ,
Autoloader is using checkpointing mechanism to track files that have been processed. When you run your DLT pipeline, Autoloader remembers the state of files processed during the last run and it will not reprocess files that were previously processed.
Do you really want to reprocess the existing files from previous days?
09-11-2024 12:56 PM
Thanks for your reply. That makes a lot of sense. So it looks like dlt will scan those files but it won’t load them to the df, next day I run it to pick up previous day data. It only brings some of them. I need to reprocess those files, but only from yesterday. Do you know if there’s any workaround ?i tried option("cloudFiles.includeExistingFiles",”true”) but that didn’t do it.
09-11-2024 01:29 PM
option("cloudFiles.includeExistingFiles",”true”) will not work.
This option is responsible for processing the files in the folder when autoloader is started, so it matters only in the first run of autoloader and it is default to true.
Could you once again explain what you are trying to achieve?
Why do you want to reprocess the already processed files?
Do I understand correctly, that the requirement is to exclude the files from today?
In the code shared you say you are doing .load(blobName"/"+FolderName+"/*.json"), so it should pick up all the files that were not processed previously.
09-11-2024 02:20 PM - edited 09-11-2024 02:24 PM
Do I understand correctly, that the requirement is to exclude the files from today? Yes, excatly
In the code shared you say you are doing .load(blobName"/"+FolderName+"/*.json"), so it should pick up all the files that were not processed previously. this is the problem. it doesn't process all files.
So, What I am trying to process are the files that were excluded from yesterday's run. For example, today (September 11th), the pipeline should process all files up to yesterday, excluding today's files since they're incomplete. Tomorrow (September 12th), the pipeline should process all files up to yesterday, including the files that were excluded yesterday, but still exclude today's files. In essence, the pipeline should always exclude files from the current run day but can process any files prior to that day.
09-12-2024 12:00 AM
Hi @standup1 ,
To do so you need to calculate today_date, add file_date to dataframe and then to get only records where file_date less than today_date:
My folders:
The code to get files excluding today folders based on folder name
Output:
Hope it helps
09-12-2024 08:12 AM
Hi @filipniziol ,
Thank you so much for sharing this example with me. This is very helpful. I appreciate your help.
09-12-2024 08:26 AM
Hi @standup1 , I'm glad the example was helpful
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group