cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Autoloader with file notification mode sleeps for 5000ms multiple times

Abdul-Mannan
New Contributor III

Using DBR 15.4, i'm ingesting streaming data from adls using autoloader with file notification mode enabled. This is an older code which is using foreachbatch sink to process the data before merging with tables in delta lake. 

Issue

Streaming job, is using available now trigger, but rather than processing the data in one go, it sleeps for 5000ms multiple times before closing the stream.

AbdulMannan_0-1733760650416.png

Expectation

With available now trigger, it should process the available data and then close the stream, rather than waiting for 5000ms multiple times (5 to 6 times) which is creating undesired execution delay.

Here's the autoloader options used with streaming job:

{
    'cloudFiles.format': 'json', 
    'cloudFiles.includeExistingFiles': 'false', 
    'cloudFiles.maxFilesPerTrigger': 1000, 
    'cloudFiles.maxBytesPerTrigger': '2g', 
    'cloudFiles.useNotifications': 'true', 
    'cloudFiles.subscriptionId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.tenantId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.clientId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.clientSecret': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.resourceGroup': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 
    'cloudFiles.fetchParallelism': 10, 
    'cloudFiles.resourceTag.streaming_job_autoloader_stream_id': 'databricks-event-xxxxxxxxxxxxxxx', 
    'cloudFiles.queueName': 'databricks-event-xxxxxxxxxxxxxxx', 
    'pathGlobfilter': '*.json'
}

Question

Is this the default behaviour with file notification mode in autoloader?
Is it possible to customize/remove the delay?

Thanks

14 REPLIES 14

VZLA
Databricks Employee
Databricks Employee

Hi @Abdul-Mannan thanks for your question!

To control the 5000ms default value, you can use the cloudFiles.queueFetchInterval option. This option allows you to specify the interval at which Auto Loader fetches messages from the queueing service.

Here is an example of how you can set this option in your Auto Loader configuration:

df = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.queueFetchInterval", "500ms")  # Set the desired interval here
      .load("path/to/source"))

df.writeStream
  .format("delta")
  .option("checkpointLocation", "path/to/checkpoint")
  .start("path/to/destination")

In this example, the cloudFiles.queueFetchInterval is set to 500ms, but you can adjust this value to meet your specific requirements. This setting controls how frequently Auto Loader fetches new messages from the queue, which can help in reducing the delay you are experiencing.

Hope it helps!

Abdul-Mannan
New Contributor III

Hi @VZLA 

Thank you for your reply.


I could not find this option in the autoloader docs, where can I find more details on this option?

Thanks

VZLA
Databricks Employee
Databricks Employee

Erik
Valued Contributor III

This documentation gives the impression of being about an old deprecated feature (e.g. the line "The ABS-AQS source is deprecated. For new streams, we recommend using Auto Loader instead."). If these config options are still relevant for autoloader I recommend that you update the auto loader documentation to mention them ๐Ÿ˜‰

 

VZLA
Databricks Employee
Databricks Employee

Thanks @Erik Absolutely, I agree with respect to updating the autoloader documentation.

Not on the Azure website though, but on Databrick's you may use the doc-feedback@databricks.com which is linked in each documentation section and provide your feedback, which the Documentation team will gladly review and take care of fixing(adding/updating/removing).

Abdul-Mannan
New Contributor III

Is it possible to close the stream on 1st try when there is no data in queue?

Please suggest if there is a config which can do it.

Thanks

VZLA
Databricks Employee
Databricks Employee

Unfortunately, I don't think this is possible or configurable. With the "available now" trigger, Auto Loader checks multiple times before closing if it finds no data. Reducing the cloudFiles.queueFetchInterval and enabling async fetch are the main options to minimize the delay.

Abdul-Mannan
New Contributor III

I tried using the option 

cloudFiles.queueFetchInterval

but it is still taking a minute to process the stream even though there is no data.

AbdulMannan_0-1733823590639.png

 

 

VZLA
Databricks Employee
Databricks Employee

Can you please try setting "spark.databricks.cloudFiles.useAsyncFetch true" at the cluster level ?

I'm not sure, if this will still be applied, but if restarting the cluster is not possible then try via session level config:
spark.conf.set("spark.databricks.cloudFiles.useAsyncFetch", "true")

When enabled, Autoloader will use an optimized async client for fetching messages. This allows the FileEventFetcher to interact with the queueing service asynchronously, potentially reducing delays.

Abdul-Mannan
New Contributor III

I tried following options

# with autoloader options
cloudFiles.fetchParallelism = 10
cloudFiles.queueFetchInterval = "500ms"
# setting this at the start of notebook execution
spark.conf.set("spark.databricks.cloudFiles.useAsyncFetch", "true")

 it seems to be stuck and not making any progress. 

AbdulMannan_0-1733843250269.png

There is no data in ADLS queue for this stream but it was stuck there for more than 40mins then I cancelled the task.

AbdulMannan_1-1733843394562.png

If I disable/not set this property

spark.databricks.cloudFiles.useAsyncFetch

it processes the stream but still takes a minute even though the queue is empty.

 

VZLA
Databricks Employee
Databricks Employee

The logging behavior appears normal and is influenced by the sync/async property configuration. However, the 40+ minute runtime is unusual and could indicate delays related to producer/consumer states, ADLS queue fetches, or metadata cleanup tasks.

To investigate further, I recommend raising a support ticket with the Driver logs and Driver Thread Dumps attached for a detailed root cause analysis.

Abdul-Mannan
New Contributor III

Thank you @VZLA  for your support. I'll proceed with next steps.

VZLA
Databricks Employee
Databricks Employee

Sure, thanks for helping improve our product, looking forward to assisting you through our support channel.

Abdul-Mannan
New Contributor III

@VZLA 
I just tested it and it seems this autoloader behaviour with available now trigger & file notification enabled, would remain the same with DLT pipeline, it sleeps 7 times each time sleeping for 5000ms before finally closing the stream, even though there is no data in the queue. Each stream takes atleast 1 min even when the queue used for file notification has no data.

Is there any other way to avoid this behaviour?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group