szymon_dybczak
Esteemed Contributor III

Hi,

Could you check if databricks has created an event grid resource in the resource group where your storage account is located and if it has created a queue? (below screens with queue and event grid)

Slash_0-1720273744129.pngSlash_1-1720273797878.png

 



Assumig you wanted to use Databricks Auto Loader to setup a notification service and queue service for you, you need to have service principal with required permissions to make it work (more on that on this link What is Auto Loader file notification mode? - Azure Databricks | Microsoft Learn).

Also, make sure that your files names not begin with an underscore ’_’, otherwise, files will be ignored by the autoloader.

Below is the code I've used to setup file notification mode and test incremental loading. It worked without issue.
You can try this and check if it'll work for you.

 

 

checkpoint_path = "abfss://your_container@storage_acc_name.dfs.core.windows.net/_checkpoint/dev_table"

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .option("cloudFiles.useNotifications", "true")     
  .option("multiLine", "false")
  .option('cloudFiles.allowOverwrites', 'true')
  .option("cloudFiles.useNotifications", 'true')
  .option("cloudFiles.fetchParallelism", 1)
  .option("cloudFiles.subscriptionId", "subscription_id")
  .option("cloudFiles.tenantId", "tenant_id")
  .option("cloudFiles.clientId", "client_id")
  .option("cloudFiles.clientSecret", "client_secret")
  .option("cloudFiles.resourceGroup", "resource_group_name") 
  .load("abfss://your_container@storage_acc_name.dfs.core.windows.net/path_to_files")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable("catalog_name.schema_name_table_name"))

 

 


Also, pretty useful to check what files have been discovered by an Auto Loader stream is below function:

SELECT * FROM cloud_files_state('path/to/checkpoint');