cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Autoloader File Notification Mode not working as expected

ibrahim21124
New Contributor III

I am using this given code to read from a source location in ADLS Gen 2 Azure Storage Container.

 

core_df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("multiLine", "false")
        .option('cloudFiles.allowOverwrites', 'true')
        .option("cloudFiles.useNotifications", 'true')
        .option("cloudFiles.fetchParallelism", 1)
 
I am reading files of the format
xxxxx.abc.json
 
When I am using full refresh I am getting all the files in the given container. But When I try and upload a new file and try to use incrementally I am not getting any new file. Is there any option or argument that can help me deal with this scenario
 
7 REPLIES 7

szymon_dybczak
Contributor III

Hi,

Could you check if databricks has created an event grid resource in the resource group where your storage account is located and if it has created a queue? (below screens with queue and event grid)

Slash_0-1720273744129.pngSlash_1-1720273797878.png

 



Assumig you wanted to use Databricks Auto Loader to setup a notification service and queue service for you, you need to have service principal with required permissions to make it work (more on that on this link What is Auto Loader file notification mode? - Azure Databricks | Microsoft Learn).

Also, make sure that your files names not begin with an underscore โ€™_โ€™, otherwise, files will be ignored by the autoloader.

Below is the code I've used to setup file notification mode and test incremental loading. It worked without issue.
You can try this and check if it'll work for you.

 

 

checkpoint_path = "abfss://your_container@storage_acc_name.dfs.core.windows.net/_checkpoint/dev_table"

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .option("cloudFiles.useNotifications", "true")     
  .option("multiLine", "false")
  .option('cloudFiles.allowOverwrites', 'true')
  .option("cloudFiles.useNotifications", 'true')
  .option("cloudFiles.fetchParallelism", 1)
  .option("cloudFiles.subscriptionId", "subscription_id")
  .option("cloudFiles.tenantId", "tenant_id")
  .option("cloudFiles.clientId", "client_id")
  .option("cloudFiles.clientSecret", "client_secret")
  .option("cloudFiles.resourceGroup", "resource_group_name") 
  .load("abfss://your_container@storage_acc_name.dfs.core.windows.net/path_to_files")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable("catalog_name.schema_name_table_name"))

 

 


Also, pretty useful to check what files have been discovered by an Auto Loader stream is below function:

SELECT * FROM cloud_files_state('path/to/checkpoint');

 

 

 

Hello @szymon_dybczak ,

I have provided the required permissions for the Service principal. The Queue is generated in the Storage Account. We are actually not using writeStream functionalities, we are using Unity Catalog and storing the data via Delta Live Pipelines into Delta Live Tables. So In that context how can we use the checkpoint option. 

One more observation from my side, when queue is cleared or deleted and when run incrementally the pipeline is accepting the new files. 

Thanks for the response.

 

Regards,

Ibrahim

Hi @ibrahim21124,

Oh, so since you are using DLT then these write and checkpoints steps are handled for you by DLT framework. So maybe let's try do it step by step and monitor what happens inside checkpoint after each pipeline execution.

So what I would try is to once again clear/delete queue and run pipeline. Then I would check if all files were correctly saved in RocksDB state store using below function. If you use Delta Live Tables then checkpoints are stored under the storage location specified in the DLT settings. Each table gets a dedicated directory under storage_location/checkpoints/dlt_table_name.

SELECT * FROM cloud_files_state('path/to/checkpoint');

 Then I would upload additional files to storage and once again use above functions to check if auto loader detected new file and add it to state store.

One additional question, do new files stuck at the queue? I mean, after you upload new file and event grid sends the notification to storage queue, do you see those files in queue or does queue is empty?

 

Regards,

slash

Hey @szymon_dybczak ,

I checked my Storage Container and I am not getting any folder with the given checkpoint path. Do I need to setup anything differently for that? The concern which I am having is that the Incremental load was working well when using the directory listing mode but when changed to File notification mode it stopped picking up new files.

We are also not using rocksDB state store. We are using files store in ADLS Gen2 Storage container  as an input and then dumping them in Streaming Delta Live Tables in Unity Catalog. 

Thanks and Regards,

Ibrahim

Hi @ibrahim21124 ,

If you are using auto loader you also implicitly using RockDB - it stores state data at a checkpoint position within the RocksDB key-value store.
To find your checkpoint location follow the answer of Jacek Laskowski: https://stackoverflow.com/a/75716559
Also, under the hood autoloader is based on spark structured streaming model. I think it will be good for you to familiarize with documention, definitly will clarify a lot of concepts like checkpointing, state store etc.

Structured Streaming Programming Guide - Spark 3.2.0 Documentation (apache.org)

 

Was there a resolution to this issue? We are experiencing similar symptoms after switching to file notification mode. It would be very helpful to the community if someone would update this thread with the resolution if one was found. 

Also, you don't need to know about the checkpoint location for the  cloud_files_state function call. You just pass the TABLE(table name) to the call instead. 

Rishabh_Tiwari
Databricks Employee
Databricks Employee

Hi @ibrahim21124 ,

Thank you for reaching out to our community! We're here to help you. 

To ensure we provide you with the best support, could you please take a moment to review the response and choose the one that best answers your question? Your feedback not only helps us assist you better but also benefits other community members who may have similar questions in the future.

If you found the answer helpful, consider giving it a kudo. If the response fully addresses your question, please mark it as the accepted solution. This will help us close the thread and ensure your question is resolved.

We appreciate your participation and are here to assist you further if you need it!

Thanks,

Rishabh

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group