TLDR format: How do I make a structured streaming job using autoloader read files using InMemoryFileIndex instead of DeltaFileOperations?
I'm running a structured streaming job from an external (ADLS Gen2, abfss://), storage account which has avro files in a structure of /path/{yyyy-mm-dd}/{hh-mm-ss}.
When I run the job I can see from the log4j logs that it runs a DeltaFileOperations on every directory and subdirectory. As there is one subdirectory for every second and we have a bit more than 2 years of data, there are 75 million subdirectories. This fills up the logs and takes a long time to list out.
23/10/03 09:05:48 INFO DeltaFileOperations: Listing abfss:/{container}@{storage_account}.dfs.core.windows.net/path/2021-05-20/22-09-03
The main problem however is that it the the jobs spends time listing out all the already processed directories, before it starts any actual spark tasks of processing new data. For example it spends ~1.5 hours listing 2 years of directories that are already processed, then it comes to the new day of data and processed that in a few minutes. This time spent listing out previously processed directories is unecessary, and is what I want to cut down. This is shown in the screenshot from the Spark UI below, where the Spark executors are added around 10:30, and the actual spark tasks start around 12:15. The time between 10:30 and 12:15 is just spend on DeltaFileOperations listing operations.
The actual structured streaming code I'm running can be found here.
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", self.file_format)
.option("cloudFiles.schemaLocation", self.checkpoint_location)
.load(self.external_location_path)
.select(schema)
)
df.writeStream.format("delta")
.trigger(availableNow=True)
.outputMode("append")
.partitionBy("Year", "Month", "Day")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.queryName(self.table_definition.table_name)
.toTable(self.cache_table_path)
)
I've read that InMemoryFileIndex can be used to more efficiently list directories and look for files. Is there any way I can configure the spark job to use a InMemoryFileIndex spark job instead of the many DeltaFileOperations?