cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader with file notification mode sleeps for 5000ms multiple times

Abdul-Mannan
New Contributor III

Using DBR 15.4, i'm ingesting streaming data from adls using autoloader with file notification mode enabled. This is an older code which is using foreachbatch sink to process the data before merging with tables in delta lake. 

Issue

Streaming job, is using available now trigger, but rather than processing the data in one go, it sleeps for 5000ms multiple times before closing the stream.

AbdulMannan_0-1733760650416.png

Expectation

With available now trigger, it should process the available data and then close the stream, rather than waiting for 5000ms multiple times (5 to 6 times) which is creating undesired execution delay.

Here's the autoloader options used with streaming job:

{
    'cloudFiles.format': 'json', 
    'cloudFiles.includeExistingFiles': 'false', 
    'cloudFiles.maxFilesPerTrigger': 1000, 
    'cloudFiles.maxBytesPerTrigger': '2g', 
    'cloudFiles.useNotifications': 'true', 
    'cloudFiles.subscriptionId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.tenantId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.clientId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.clientSecret': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.resourceGroup': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.fetchParallelism': 10, 
    'cloudFiles.resourceTag.streaming_job_autoloader_stream_id': 'databricks-event-xxxxxxxxxxxxxxx', 
    'cloudFiles.queueName': 'databricks-event-xxxxxxxxxxxxxxx', 
    'pathGlobfilter': '*.json'
}

Question

Is this the default behaviour with file notification mode in autoloader?
Is it possible to customize/remove the delay?

Thanks

9 REPLIES 9

VZLA
Databricks Employee
Databricks Employee

Hi @Abdul-Mannan thanks for your question!

To control the 5000ms default value, you can use the cloudFiles.queueFetchInterval option. This option allows you to specify the interval at which Auto Loader fetches messages from the queueing service.

Here is an example of how you can set this option in your Auto Loader configuration:

df = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.queueFetchInterval", "500ms")  # Set the desired interval here
      .load("path/to/source"))

df.writeStream
  .format("delta")
  .option("checkpointLocation", "path/to/checkpoint")
  .start("path/to/destination")

In this example, the cloudFiles.queueFetchInterval is set to 500ms, but you can adjust this value to meet your specific requirements. This setting controls how frequently Auto Loader fetches new messages from the queue, which can help in reducing the delay you are experiencing.

Hope it helps!

Abdul-Mannan
New Contributor III

Hi @VZLA 

Thank you for your reply.


I could not find this option in the autoloader docs, where can I find more details on this option?

Thanks

VZLA
Databricks Employee
Databricks Employee

Abdul-Mannan
New Contributor III

Is it possible to close the stream on 1st try when there is no data in queue?

Please suggest if there is a config which can do it.

Thanks

VZLA
Databricks Employee
Databricks Employee

Unfortunately, I don't think this is possible or configurable. With the "available now" trigger, Auto Loader checks multiple times before closing if it finds no data. Reducing the cloudFiles.queueFetchInterval and enabling async fetch are the main options to minimize the delay.

Abdul-Mannan
New Contributor III

I tried using the option 

cloudFiles.queueFetchInterval

but it is still taking a minute to process the stream even though there is no data.

AbdulMannan_0-1733823590639.png

 

 

VZLA
Databricks Employee
Databricks Employee

Can you please try setting "spark.databricks.cloudFiles.useAsyncFetch true" at the cluster level ?

I'm not sure, if this will still be applied, but if restarting the cluster is not possible then try via session level config:
spark.conf.set("spark.databricks.cloudFiles.useAsyncFetch", "true")

When enabled, Autoloader will use an optimized async client for fetching messages. This allows the FileEventFetcher to interact with the queueing service asynchronously, potentially reducing delays.

Abdul-Mannan
New Contributor III

I tried following options

# with autoloader options
cloudFiles.fetchParallelism = 10
cloudFiles.queueFetchInterval = "500ms"
# setting this at the start of notebook execution
spark.conf.set("spark.databricks.cloudFiles.useAsyncFetch", "true")

 it seems to be stuck and not making any progress. 

AbdulMannan_0-1733843250269.png

There is no data in ADLS queue for this stream but it was stuck there for more than 40mins then I cancelled the task.

AbdulMannan_1-1733843394562.png

If I disable/not set this property

spark.databricks.cloudFiles.useAsyncFetch

it processes the stream but still takes a minute even though the queue is empty.

 

VZLA
Databricks Employee
Databricks Employee

The logging behavior appears normal and is influenced by the sync/async property configuration. However, the 40+ minute runtime is unusual and could indicate delays related to producer/consumer states, ADLS queue fetches, or metadata cleanup tasks.

To investigate further, I recommend raising a support ticket with the Driver logs and Driver Thread Dumps attached for a detailed root cause analysis.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group