โ07-05-2024 02:37 AM
I am using this given code to read from a source location in ADLS Gen 2 Azure Storage Container.
โ07-06-2024 06:50 AM - edited โ07-06-2024 06:53 AM
Hi,
Could you check if databricks has created an event grid resource in the resource group where your storage account is located and if it has created a queue? (below screens with queue and event grid)
Assumig you wanted to use Databricks Auto Loader to setup a notification service and queue service for you, you need to have service principal with required permissions to make it work (more on that on this link What is Auto Loader file notification mode? - Azure Databricks | Microsoft Learn).
Also, make sure that your files names not begin with an underscore โ_โ, otherwise, files will be ignored by the autoloader.
Below is the code I've used to setup file notification mode and test incremental loading. It worked without issue.
You can try this and check if it'll work for you.
checkpoint_path = "abfss://your_container@storage_acc_name.dfs.core.windows.net/_checkpoint/dev_table"
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.useNotifications", "true")
.option("multiLine", "false")
.option('cloudFiles.allowOverwrites', 'true')
.option("cloudFiles.useNotifications", 'true')
.option("cloudFiles.fetchParallelism", 1)
.option("cloudFiles.subscriptionId", "subscription_id")
.option("cloudFiles.tenantId", "tenant_id")
.option("cloudFiles.clientId", "client_id")
.option("cloudFiles.clientSecret", "client_secret")
.option("cloudFiles.resourceGroup", "resource_group_name")
.load("abfss://your_container@storage_acc_name.dfs.core.windows.net/path_to_files")
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable("catalog_name.schema_name_table_name"))
Also, pretty useful to check what files have been discovered by an Auto Loader stream is below function:
SELECT * FROM cloud_files_state('path/to/checkpoint');
โ07-07-2024 11:03 PM - edited โ07-07-2024 11:04 PM
Hello @szymon_dybczak ,
I have provided the required permissions for the Service principal. The Queue is generated in the Storage Account. We are actually not using writeStream functionalities, we are using Unity Catalog and storing the data via Delta Live Pipelines into Delta Live Tables. So In that context how can we use the checkpoint option.
One more observation from my side, when queue is cleared or deleted and when run incrementally the pipeline is accepting the new files.
Thanks for the response.
Regards,
Ibrahim
โ07-08-2024 01:05 AM - edited โ07-08-2024 01:27 AM
Hi @ibrahim21124,
Oh, so since you are using DLT then these write and checkpoints steps are handled for you by DLT framework. So maybe let's try do it step by step and monitor what happens inside checkpoint after each pipeline execution.
So what I would try is to once again clear/delete queue and run pipeline. Then I would check if all files were correctly saved in RocksDB state store using below function. If you use Delta Live Tables then checkpoints are stored under the storage location specified in the DLT settings. Each table gets a dedicated directory under storage_location/checkpoints/dlt_table_name.
SELECT * FROM cloud_files_state('path/to/checkpoint');
Then I would upload additional files to storage and once again use above functions to check if auto loader detected new file and add it to state store.
One additional question, do new files stuck at the queue? I mean, after you upload new file and event grid sends the notification to storage queue, do you see those files in queue or does queue is empty?
Regards,
slash
โ07-08-2024 11:17 PM
Hey @szymon_dybczak ,
I checked my Storage Container and I am not getting any folder with the given checkpoint path. Do I need to setup anything differently for that? The concern which I am having is that the Incremental load was working well when using the directory listing mode but when changed to File notification mode it stopped picking up new files.
We are also not using rocksDB state store. We are using files store in ADLS Gen2 Storage container as an input and then dumping them in Streaming Delta Live Tables in Unity Catalog.
Thanks and Regards,
Ibrahim
โ07-08-2024 11:56 PM
Hi @ibrahim21124 ,
If you are using auto loader you also implicitly using RockDB - it stores state data at a checkpoint position within the RocksDB key-value store.
To find your checkpoint location follow the answer of Jacek Laskowski: https://stackoverflow.com/a/75716559
Also, under the hood autoloader is based on spark structured streaming model. I think it will be good for you to familiarize with documention, definitly will clarify a lot of concepts like checkpointing, state store etc.
Structured Streaming Programming Guide - Spark 3.2.0 Documentation (apache.org)
a month ago
Was there a resolution to this issue? We are experiencing similar symptoms after switching to file notification mode. It would be very helpful to the community if someone would update this thread with the resolution if one was found.
Also, you don't need to know about the checkpoint location for the cloud_files_state function call. You just pass the TABLE(table name) to the call instead.
โ07-18-2024 08:57 AM
Hi @ibrahim21124 ,
Thank you for reaching out to our community! We're here to help you.
To ensure we provide you with the best support, could you please take a moment to review the response and choose the one that best answers your question? Your feedback not only helps us assist you better but also benefits other community members who may have similar questions in the future.
If you found the answer helpful, consider giving it a kudo. If the response fully addresses your question, please mark it as the accepted solution. This will help us close the thread and ensure your question is resolved.
We appreciate your participation and are here to assist you further if you need it!
Thanks,
Rishabh
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group