cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to make structured streaming with autoloader efficiently and incrementally read files

vmpmreistad
New Contributor II

TLDR format: How do I make a structured streaming job using autoloader read files using InMemoryFileIndex instead of DeltaFileOperations?

I'm running a structured streaming job from an external (ADLS Gen2, abfss://), storage account which has avro files in a structure of /path/{yyyy-mm-dd}/{hh-mm-ss}. 

When I run the job I can see from the log4j logs that it runs a DeltaFileOperations on every directory and subdirectory. As there is one subdirectory for every second and we have a bit more than 2 years of data, there are 75 million subdirectories. This fills up the logs and takes a long time to list out.

23/10/03 09:05:48 INFO DeltaFileOperations: Listing abfss:/{container}@{storage_account}.dfs.core.windows.net/path/2021-05-20/22-09-03

The main problem however is that it the the jobs spends time listing out all the already processed directories, before it starts any actual spark tasks of processing new data. For example it spends ~1.5 hours listing 2 years of  directories that are already processed, then it comes to the new day of data and processed that in a few minutes. This time spent listing out previously processed directories is unecessary, and is what I want to cut down. This is shown in the screenshot from the Spark UI below, where the Spark executors are added around 10:30, and the actual spark tasks start around 12:15. The time between 10:30 and 12:15 is just spend on DeltaFileOperations listing operations.

vmpmreistad_0-1696330325302.png

The actual structured streaming code I'm running can be found here.

df = (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", self.file_format)
            .option("cloudFiles.schemaLocation", self.checkpoint_location)
            .load(self.external_location_path)
            .select(schema)
)
df.writeStream.format("delta")
            .trigger(availableNow=True)
            .outputMode("append")
            .partitionBy("Year", "Month", "Day")
            .option("checkpointLocation", self.checkpoint_location)
            .option("mergeSchema", "true")
            .queryName(self.table_definition.table_name)
            .toTable(self.cache_table_path)
)
I've read that InMemoryFileIndex can be used to more efficiently list directories and look for files. Is there any way I can configure the spark job to use a InMemoryFileIndex spark job instead of the many DeltaFileOperations?

 

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @vmpmreistadThe structured streaming job uses Auto Loader, a Databricks feature.
- Auto Loader optimizes the listing and reading of files from cloud storage.
- Auto Loader uses the cloudFiles data source, built on DeltaFileOperations.
- There is no direct way to configure the job to use InMemoryFileIndex instead of DeltaFileOperations.
- InMemoryFileIndex is an internal class used for listing files and directories.
- Reorganizing files into fewer directories can make the listing operation more efficient.
- Writing a custom data source to use InMemoryFileIndex is complex and not recommended.
- The provided code uses the "cloudFiles" format, specific to Auto Loader and DeltaFileOperations.

vmpmreistad
New Contributor II

Hi @Kaniz and thank you for your response.

Ok, I see that DeltaFileOperations are a part of how Auto Loader operates. But I'm guessing it's not expected behavior that Auto Loader should run DeltaFileOperations to list directories that have been previously outlined?

As described in the post, in my case it is using almost 2 hours to list out directories of data from 2021 to 2023, which has already been processed by previous runs by the structured streaming job. To me, it seems like it only starts processing new data after it has listed out all of these  directories up to where it hasn't processed data before. That makes it very inefficient if running as a daily job where almost 2 hours are spent on listing directories and 3 minutes is spent on processing the new day of data. 

Is there a way to allow it to skip listing previously processed files, or is there something that I must be doing wrong?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.