Monday
Using DBR 15.4, i'm ingesting streaming data from adls using autoloader with file notification mode enabled. This is an older code which is using foreachbatch sink to process the data before merging with tables in delta lake.
Issue
Streaming job, is using available now trigger, but rather than processing the data in one go, it sleeps for 5000ms multiple times before closing the stream.
Expectation
With available now trigger, it should process the available data and then close the stream, rather than waiting for 5000ms multiple times (5 to 6 times) which is creating undesired execution delay.
Here's the autoloader options used with streaming job:
{ 'cloudFiles.format': 'json', 'cloudFiles.includeExistingFiles': 'false', 'cloudFiles.maxFilesPerTrigger': 1000, 'cloudFiles.maxBytesPerTrigger': '2g', 'cloudFiles.useNotifications': 'true', 'cloudFiles.subscriptionId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'cloudFiles.tenantId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'cloudFiles.clientId': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'cloudFiles.clientSecret': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'cloudFiles.resourceGroup': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'cloudFiles.fetchParallelism': 10, 'cloudFiles.resourceTag.streaming_job_autoloader_stream_id': 'databricks-event-xxxxxxxxxxxxxxx', 'cloudFiles.queueName': 'databricks-event-xxxxxxxxxxxxxxx', 'pathGlobfilter': '*.json' }
Question
Is this the default behaviour with file notification mode in autoloader?
Is it possible to customize/remove the delay?
Thanks
Monday
Hi @Abdul-Mannan thanks for your question!
To control the 5000ms default value, you can use the cloudFiles.queueFetchInterval option. This option allows you to specify the interval at which Auto Loader fetches messages from the queueing service.
Here is an example of how you can set this option in your Auto Loader configuration:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.queueFetchInterval", "500ms") # Set the desired interval here
.load("path/to/source"))
df.writeStream
.format("delta")
.option("checkpointLocation", "path/to/checkpoint")
.start("path/to/destination")
In this example, the cloudFiles.queueFetchInterval
is set to 500ms
, but you can adjust this value to meet your specific requirements. This setting controls how frequently Auto Loader fetches new messages from the queue, which can help in reducing the delay you are experiencing.
Hope it helps!
Monday
Hi @VZLA
Thank you for your reply.
I could not find this option in the autoloader docs, where can I find more details on this option?
Thanks
Tuesday
You can find it in here:
https://docs.azure.cn/en-us/databricks/spark/latest/structured-streaming/aqs
19 hours ago
This documentation gives the impression of being about an old deprecated feature (e.g. the line "The ABS-AQS source is deprecated. For new streams, we recommend using Auto Loader instead."). If these config options are still relevant for autoloader I recommend that you update the auto loader documentation to mention them 😉
15 hours ago
Thanks @Erik Absolutely, I agree with respect to updating the autoloader documentation.
Not on the Azure website though, but on Databrick's you may use the doc-feedback@databricks.com which is linked in each documentation section and provide your feedback, which the Documentation team will gladly review and take care of fixing(adding/updating/removing).
Tuesday
Is it possible to close the stream on 1st try when there is no data in queue?
Please suggest if there is a config which can do it.
Thanks
Tuesday
Unfortunately, I don't think this is possible or configurable. With the "available now" trigger, Auto Loader checks multiple times before closing if it finds no data. Reducing the cloudFiles.queueFetchInterval and enabling async fetch are the main options to minimize the delay.
Tuesday
I tried using the option
cloudFiles.queueFetchInterval
but it is still taking a minute to process the stream even though there is no data.
Tuesday - last edited Tuesday
Can you please try setting "spark.databricks.cloudFiles.useAsyncFetch true" at the cluster level ?
I'm not sure, if this will still be applied, but if restarting the cluster is not possible then try via session level config:
spark.conf.set("spark.databricks.cloudFiles.useAsyncFetch", "true")
When enabled, Autoloader will use an optimized async client for fetching messages. This allows the FileEventFetcher to interact with the queueing service asynchronously, potentially reducing delays.
Tuesday
I tried following options
# with autoloader options
cloudFiles.fetchParallelism = 10
cloudFiles.queueFetchInterval = "500ms"
# setting this at the start of notebook execution
spark.conf.set("spark.databricks.cloudFiles.useAsyncFetch", "true")
it seems to be stuck and not making any progress.
There is no data in ADLS queue for this stream but it was stuck there for more than 40mins then I cancelled the task.
If I disable/not set this property
spark.databricks.cloudFiles.useAsyncFetch
it processes the stream but still takes a minute even though the queue is empty.
Tuesday
The logging behavior appears normal and is influenced by the sync/async property configuration. However, the 40+ minute runtime is unusual and could indicate delays related to producer/consumer states, ADLS queue fetches, or metadata cleanup tasks.
To investigate further, I recommend raising a support ticket with the Driver logs and Driver Thread Dumps attached for a detailed root cause analysis.
yesterday
Thank you @VZLA for your support. I'll proceed with next steps.
15 hours ago
Sure, thanks for helping improve our product, looking forward to assisting you through our support channel.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group