10-23-2024 05:10 AM - edited 10-23-2024 05:12 AM
Hello,
I am trying to ingest CSV data with Auto Loader from an Azure Data Lake. I want to perform batch ingestion by using a scheduled job and the following trigger:
.trigger(availableNow=True)
The CSV files are generated by Azure Synapse Link. If more than five minutes have passed since the last recorded change to a table in Microsoft Dataverse, a new CSV file gets written to the data lake recording the changes made. In the following five minutes, the new CSV file can still get rows inserted into it if a change to the table is made.
If Auto Loader gets triggered right after a new CSV file is created, it could potentially miss out on changes that will be written to the (now already ingested) CSV file.
A solution that I thought would work was to use Auto Loader with the
modifiedBefore
option, and specify a timestamp of
datetime.utcnow() - timedelta(minutes=5)
This seemed to work at first: a file that isn't older than five minutes is successfully ignored. However, when Auto Loader is run again later, it doesn't ingest the CSV file that was previously ignored. It seems that Auto Loader registers the CSV file (which was less than 5 minutes old during the initial run) and therefore doesn't ingest the file during the second run when the CSV file is now older than 5 minutes.
Is this the intended behavior of the use of the modifiedBefore option? Or are my observations wrong? If this is the intended behavior, are there any simple workarounds achievable by setting another option in Auto Loader?
Thanks for any help with this,
Gil
10-23-2024 08:57 AM
Hi @gilt,
How are you doing today?
As per my understanding, Consider adjusting the Auto Loader configuration since the modifiedBefore option seems to mark the file as processed during the first trigger, even if it’s incomplete. This behavior might be expected because Auto Loader registers the file in its metadata. One potential solution is to introduce a delay in triggering the ingestion job, allowing enough time for the CSV file to be fully written. Alternatively, you could experiment with the ignoreChanges option, ensuring Auto Loader picks up files based on content rather than modification time. Also, consider using a watermarking strategy or checking for file size stability before ingestion to avoid partial file ingestion.
Give a try and let me know.
Regards,
Brahma
2 weeks ago
Hi @gilt , did you get a workaround for your problem? I have the same issue.
2 weeks ago - last edited 2 weeks ago
Hi, @anasimiao
I did find a workaround, but ended up not using the modifiedBefore option in Auto Loader. Using modifiedBefore would have been the cleanest and easiest solution, but it doesn't seem to be intended for the purpose of reading files after they have reached a certain age (if they have already been ignored by an earlier run of Auto Loader).
In my case, every new CSV file gets written to a timestamped folder. I start by listing out all of these folders using
folders = dbutils.fs.ls("path-to-folder-containing-timestamped-folders")
Then I convert each one to datetime format and keep only the ones older than 5 minutes. (Actually, in my case I know that all folders except for the most recent one contain CSV files that are done being written to, so I simply sort the folders by timestamp and remove the most recent one).
I then construct a string that looks like this:
timestamped_folders = "{2024-10-30T11.51.47Z,2024-10-30T11.56.48Z}"
storage_path = f"path-to-folder-containing-timestamped-folders/{timestamped_folders}"
...
df = spark.readStream.format("cloudFiles")
...
.load(storage_path)
...
The storage_path string is constructed using the curly brackets glob pattern so that Auto Loader will only consider files in either of the two timestamped folders "2024-10-30T11.51.47Z" or "2024-10-30T11.56.48Z", and not in any other folder. Read more about this here.
As a result, Auto Loader is restricted from considering files in the most recent timestamped folder (which I want it to ignore). The next time my Databricks job runs and more recent timestamped folders are written to my cloud storage, Auto Loader will be able to consider CSV files in the timestamped folder which used to be the most recent one.
This is a bit of a hacky/manual approach, but I hope it's clear enough. As a note, I initially wanted to use a glob pattern to specify which folder to ignore (the most recent one), instead of which folders to consider (all but the most recent one), because it would be a lot simpler. However, if a new folder appears in my cloud storage between the initial run of
folders = dbutils.fs.ls("path-to-folder-containing-timestamped-folders")
and the run of Auto Loader, then I would be telling Auto Loader to ignore the second most recent folder, because a new folder was just written to cloud storage. So, as a safeguard against this, I decided to explicitly tell Auto Loader which folders it could consider, instead of telling Auto Loader which folder it couldn't consider.
2 weeks ago
Hi, @gilt
Thank you for sharing your solution. Does it work with notification mode, or are you using directory listing?
2 weeks ago
Hi, @anasimiao
I have only tested with directory listing mode.
2 weeks ago
Hi @gilt , This is the expected behavior of Autoloader. If you run Autoloader with modifiedBefore option, a file once ignored will not be considered again.
It is recommended that you write only complete files to the Autoloader source directory. If you have a setup where your source system writes files continuously and it takes some time to write to a file, you should initially write the data to a separate directory while the file is being written to by the source system. When the filewriter closes, you should then copy it to the Autoloader source directory so that Autoloader always considers complete file.
2 weeks ago
Hi, @Lakshay
Thanks for the response. Your suggested approach would indeed work, but it would require implementing some additional logic to trigger the copying of the file..
2 weeks ago
Hi @Lakshay , In this case I don't have control over the file generation. The Azure Synapse Link documentation just advises that we should consume data from previous time stamped folders only and ignore last one, since it continuously updates the last file until a new time stamped folder is generated and becomes the current one. In this case, maybe autoloader is not suitable?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group