โ10-03-2023 03:59 AM
TLDR format: How do I make a structured streaming job using autoloader read files using InMemoryFileIndex instead of DeltaFileOperations?
I'm running a structured streaming job from an external (ADLS Gen2, abfss://), storage account which has avro files in a structure of /path/{yyyy-mm-dd}/{hh-mm-ss}.
When I run the job I can see from the log4j logs that it runs a DeltaFileOperations on every directory and subdirectory. As there is one subdirectory for every second and we have a bit more than 2 years of data, there are 75 million subdirectories. This fills up the logs and takes a long time to list out.
23/10/03 09:05:48 INFO DeltaFileOperations: Listing abfss:/{container}@{storage_account}.dfs.core.windows.net/path/2021-05-20/22-09-03
The main problem however is that it the the jobs spends time listing out all the already processed directories, before it starts any actual spark tasks of processing new data. For example it spends ~1.5 hours listing 2 years of directories that are already processed, then it comes to the new day of data and processed that in a few minutes. This time spent listing out previously processed directories is unecessary, and is what I want to cut down. This is shown in the screenshot from the Spark UI below, where the Spark executors are added around 10:30, and the actual spark tasks start around 12:15. The time between 10:30 and 12:15 is just spend on DeltaFileOperations listing operations.
The actual structured streaming code I'm running can be found here.
โ10-05-2023 01:01 AM - edited โ10-05-2023 01:12 AM
Hi @Retired_mod and thank you for your response.
Ok, I see that DeltaFileOperations are a part of how Auto Loader operates. But I'm guessing it's not expected behavior that Auto Loader should run DeltaFileOperations to list directories that have been previously outlined?
As described in the post, in my case it is using almost 2 hours to list out directories of data from 2021 to 2023, which has already been processed by previous runs by the structured streaming job. To me, it seems like it only starts processing new data after it has listed out all of these directories up to where it hasn't processed data before. That makes it very inefficient if running as a daily job where almost 2 hours are spent on listing directories and 3 minutes is spent on processing the new day of data.
Is there a way to allow it to skip listing previously processed files, or is there something that I must be doing wrong?
โ08-06-2024 12:17 AM - edited โ08-06-2024 12:22 AM
Hi @Kaniz_Fatma I am facing a similar issue with one of my pipeline that is being executed on an hourly basis and was running fine (with execution time getting to max 15 min for every hourly run) using the below code:
โ12-05-2024 09:15 AM
It is not possible to modify how listing is done. The standard recommendation is to switch from directory listing mode to file notification mode with large directories. If you have any control over the structure of your source data, I do not recommend having data at a minute/second granularity, especially on ADLS gen2.
โ12-05-2024 09:17 AM
The best way to debug this is to look at the Spark UI to see if a job has been launched. One thing to call out is that trigger.Once is deprecated - we recommend using trigger.availableNow instead to avoid overwhelming the cluster.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group