cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Switching to File Notification Mode with ADLS Gen2 - Encountering StorageException

Ulman
New Contributor II

Hello,

We are currently utilizing an autoloader with file listing mode for a stream, which is experiencing significant latency due to the non-incremental naming of files in the directoryโ€”a condition that cannot be altered.

In an effort to mitigate this issue, I am exploring the transition to file notification mode. The source directory is hosted on Azure Data Lake Storage Gen2 (Premium BlockBlobStorage). I have ensured that the service principal is endowed with the requisite permissions and confirmed that our subscription has EventGrid listed as a registered Resource Provider.

However, I am confronted with a com.microsoft.azure.storage.StorageException, Caused by: java.net.UnknownHostException: <my_storage_name>.queue.core.windows.net. Given that premium ADLS Gen2 does not support queue storage, I am at a loss as to how I can effectively implement notification mode under these circumstances.

Any insights or guidance on this matter would be greatly appreciated.
Thank you in advance for your assistance!

 

8 REPLIES 8

AmanSehgal
Honored Contributor III

@Ulman 

Before turning on the file notification mode, was your pipeline able to access the same storage location or the one you're using now is new location?

Could you please share logs and screenshot of error messages?

Ulman
New Contributor II

Yes, the same storage location that works in directory listing mode, throws an exception in file notification mode.

Ulman_0-1714982786447.png
full stacktrace:

com.microsoft.azure.storage.StorageException: 
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
	at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:220)
	at com.microsoft.azure.storage.queue.CloudQueue.exists(CloudQueue.java:882)
	at com.microsoft.azure.storage.queue.CloudQueue.exists(CloudQueue.java:869)
	at com.microsoft.azure.storage.queue.CloudQueue.exists(CloudQueue.java:846)
	at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.setUpAqsQueue(AzureEventNotificationSetup.scala:112)
	at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.<init>(AzureEventNotificationSetup.scala:86)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.databricks.sql.fileNotification.autoIngest.EventNotificationSetup$.$anonfun$create$1(EventNotificationSetup.scala:68)
	at com.databricks.sql.fileNotification.autoIngest.ResourceManagementUtils$.unwrapInvocationTargetException(ResourceManagementUtils.scala:42)
	at com.databricks.sql.fileNotification.autoIngest.EventNotificationSetup$.create(EventNotificationSetup.scala:50)
	at com.databricks.sql.fileNotification.autoIngest.CloudFilesSourceProvider.$anonfun$createSource$1(CloudFilesSourceProvider.scala:130)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.sql.fileNotification.autoIngest.CloudFilesSourceProvider.createSource(CloudFilesSourceProvider.scala:115)
	at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:332)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:209)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:206)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:204)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:477)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:477)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:343)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:339)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:453)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:421)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:204)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:374)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:374)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.initSources(MicroBatchExecution.scala:390)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:429)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:398)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418)
	at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455)
	at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
	at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:72)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:172)
	at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491)
	at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603)
	at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612)
	at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491)
	at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489)
	at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:84)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:378)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:282)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:282)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:281)
Caused by: java.net.UnknownHostException: <my_storage_name>.queue.core.windows.net
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:613)
	at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:293)
	at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
	at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
	at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
	at sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:264)
	at sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:367)
	at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:203)
	at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1167)
	at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1051)
	at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1049)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.security.AccessController.doPrivilegedWithCombiner(AccessController.java:784)
	at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1048)
	at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:189)
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1577)
	at sun.net.www.protocol.http.HttpURLConnection.access$200(HttpURLConnection.java:97)
	at sun.net.www.protocol.http.HttpURLConnection$9.run(HttpURLConnection.java:1497)
	at sun.net.www.protocol.http.HttpURLConnection$9.run(HttpURLConnection.java:1495)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.security.AccessController.doPrivilegedWithCombiner(AccessController.java:784)
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1494)
	at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
	at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
	at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:115)
	... 64 more

 

Wojciech_BUK
Valued Contributor III

Hi @Ulman ,

i think that by default this method will try to create Event Grid and Storage Queue on the same Storage Account as your data.

Please not that PREMIUM Blob Storage do not have QUEUE service.

In my opinion the easiest way would be to create manually queue and event grid and then point your code to use existing queue (and make this queue on regular storage account, not premium)

If you need more help on that approach, let me know ๐Ÿ™‚ 

v01d
New Contributor III

Hello @Wojciech_BUK 
I can't get it to work, with the all required permissions to the resource group and storage (even Contributor perms), it doesn't create the resources, always returns 'not authorized to perform this operation'. Tried to build dedicated python resource manager, doesn't work.
How can I pass the created instances of DataGrid and Storage Queue? I see the option ''cloudFiles.queueName', which is the Storage Queue name (I suppose). How can I pass the DataGrid name?

Wojciech_BUK
Valued Contributor III

@v01d you can create those services by yourself from Azure Portal (or ARM or Powershell)
1. Create Storage Account e.g. "queue-receiver-storage"
2. In this Storage Create Queue e.g. "my-queue"
3. Go to your Premium Storage where your data lands and create Event Grid System Topic
(you will have to select appropriate action - when to trigger the Event Grid and point it to my-queue
4. 
Once this is done - test it - load file to Storage and check if there is anything in my-queue
5. then go to your Databricks Notebook and configure cloudFiles.connectionString to my-queue (e.g. conn string with SAS token).

Now run your Streming Nootebook - it should go to queue, pick list of files that has arrived to queue, go to storage account with your data and pick files listed in queue.

v01d
New Contributor III

hi @Wojciech_BUK 
I have a regular, not premium sa. When I create the DataGrid I don't see the option where I should set the queue name. Once I created the DataGrid, I can add the DataGrid Event Subscription (on the specific event type?) and then set the queue as an end-point. Is this a right way?

v01d
New Contributor III

@Wojciech_BUKI found the issue, to create a queue the additional private-endpoint needed. This PE should have the sub-resource type queue. After creating this PE, I see the new queue in the storage account. But after it still have an error about EventGrid creation attempts, seems like more networking resources needed for this mode. The documentation of Auto Loader Notification mode should be completed with networking requirements I think.

Rah_Cencora
New Contributor II

You should also reevaluate your use of premium storage for your landing area files. Typically, storage for raw files does not need to be the fastest and most resilient and expensive tier. Unless you have a compelling reason for premium storage for landing, I would suggest switching to standard. Ingest job overall performance should be impacted, neither should your write operations unless you are processing an extreme amount of data (Tens of terabytes) with multiple producer applications. 

Just a suggestion. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group