cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to make structured streaming with autoloader efficiently and incrementally read files

vmpmreistad
New Contributor II

TLDR format: How do I make a structured streaming job using autoloader read files using InMemoryFileIndex instead of DeltaFileOperations?

I'm running a structured streaming job from an external (ADLS Gen2, abfss://), storage account which has avro files in a structure of /path/{yyyy-mm-dd}/{hh-mm-ss}. 

When I run the job I can see from the log4j logs that it runs a DeltaFileOperations on every directory and subdirectory. As there is one subdirectory for every second and we have a bit more than 2 years of data, there are 75 million subdirectories. This fills up the logs and takes a long time to list out.

23/10/03 09:05:48 INFO DeltaFileOperations: Listing abfss:/{container}@{storage_account}.dfs.core.windows.net/path/2021-05-20/22-09-03

The main problem however is that it the the jobs spends time listing out all the already processed directories, before it starts any actual spark tasks of processing new data. For example it spends ~1.5 hours listing 2 years of  directories that are already processed, then it comes to the new day of data and processed that in a few minutes. This time spent listing out previously processed directories is unecessary, and is what I want to cut down. This is shown in the screenshot from the Spark UI below, where the Spark executors are added around 10:30, and the actual spark tasks start around 12:15. The time between 10:30 and 12:15 is just spend on DeltaFileOperations listing operations.

vmpmreistad_0-1696330325302.png

The actual structured streaming code I'm running can be found here.

df = (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", self.file_format)
            .option("cloudFiles.schemaLocation", self.checkpoint_location)
            .load(self.external_location_path)
            .select(schema)
)
df.writeStream.format("delta")
            .trigger(availableNow=True)
            .outputMode("append")
            .partitionBy("Year", "Month", "Day")
            .option("checkpointLocation", self.checkpoint_location)
            .option("mergeSchema", "true")
            .queryName(self.table_definition.table_name)
            .toTable(self.cache_table_path)
)
I've read that InMemoryFileIndex can be used to more efficiently list directories and look for files. Is there any way I can configure the spark job to use a InMemoryFileIndex spark job instead of the many DeltaFileOperations?

 

4 REPLIES 4

Hi @Retired_mod and thank you for your response.

Ok, I see that DeltaFileOperations are a part of how Auto Loader operates. But I'm guessing it's not expected behavior that Auto Loader should run DeltaFileOperations to list directories that have been previously outlined?

As described in the post, in my case it is using almost 2 hours to list out directories of data from 2021 to 2023, which has already been processed by previous runs by the structured streaming job. To me, it seems like it only starts processing new data after it has listed out all of these  directories up to where it hasn't processed data before. That makes it very inefficient if running as a daily job where almost 2 hours are spent on listing directories and 3 minutes is spent on processing the new day of data. 

Is there a way to allow it to skip listing previously processed files, or is there something that I must be doing wrong?

Wundermobility
New Contributor II

Hi @Kaniz_Fatma I am facing a similar issue with one of my pipeline that is being executed on an hourly basis and was running fine (with execution time getting to max 15 min for every hourly run) using the below code:

df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "text") \
.option("cloudFiles.useNotifications", True) \
.option("cloudFiles.region", "eu-central-1") \
.option("cloudFiles.queueUrl", QUEUE_URL) \
.schema(schema) \
.load()

<some regex commands on the df>
partitions = ["year", "month", "day"]
df.writeStream\
.format("delta")\
.trigger(once=True)\
.outputMode("append")\
.option("checkpointLocation", CHECKPOINT_PREFIX)\
.option("path", DATA_PREFIX)\
.partitionBy(*partitions)\
.start()\
.awaitTermination()


now the code gets stuck on the writestream for hours and does not load any data.

Can you guide me on how to debug it 🙂

 

cgrant
Databricks Employee
Databricks Employee

@vmpmreistad 

It is not possible to modify how listing is done. The standard recommendation is to switch from directory listing mode to file notification mode with large directories. If you have any control over the structure of your source data, I do not recommend having data at a minute/second granularity, especially on ADLS gen2.

cgrant
Databricks Employee
Databricks Employee

@Wundermobility 

The best way to debug this is to look at the Spark UI to see if a job has been launched. One thing to call out is that trigger.Once is deprecated - we recommend using trigger.availableNow instead to avoid overwhelming the cluster.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group