โ05-05-2024 05:21 AM
Hello,
We are currently utilizing an autoloader with file listing mode for a stream, which is experiencing significant latency due to the non-incremental naming of files in the directoryโa condition that cannot be altered.
In an effort to mitigate this issue, I am exploring the transition to file notification mode. The source directory is hosted on Azure Data Lake Storage Gen2 (Premium BlockBlobStorage). I have ensured that the service principal is endowed with the requisite permissions and confirmed that our subscription has EventGrid listed as a registered Resource Provider.
However, I am confronted with a com.microsoft.azure.storage.StorageException, Caused by: java.net.UnknownHostException: <my_storage_name>.queue.core.windows.net. Given that premium ADLS Gen2 does not support queue storage, I am at a loss as to how I can effectively implement notification mode under these circumstances.
Any insights or guidance on this matter would be greatly appreciated.
Thank you in advance for your assistance!
โ05-05-2024 06:21 PM
Before turning on the file notification mode, was your pipeline able to access the same storage location or the one you're using now is new location?
Could you please share logs and screenshot of error messages?
โ05-06-2024 01:07 AM
Yes, the same storage location that works in directory listing mode, throws an exception in file notification mode.
full stacktrace:
com.microsoft.azure.storage.StorageException:
at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:220)
at com.microsoft.azure.storage.queue.CloudQueue.exists(CloudQueue.java:882)
at com.microsoft.azure.storage.queue.CloudQueue.exists(CloudQueue.java:869)
at com.microsoft.azure.storage.queue.CloudQueue.exists(CloudQueue.java:846)
at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.setUpAqsQueue(AzureEventNotificationSetup.scala:112)
at com.databricks.sql.aqs.autoIngest.AzureEventNotificationSetup.<init>(AzureEventNotificationSetup.scala:86)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.databricks.sql.fileNotification.autoIngest.EventNotificationSetup$.$anonfun$create$1(EventNotificationSetup.scala:68)
at com.databricks.sql.fileNotification.autoIngest.ResourceManagementUtils$.unwrapInvocationTargetException(ResourceManagementUtils.scala:42)
at com.databricks.sql.fileNotification.autoIngest.EventNotificationSetup$.create(EventNotificationSetup.scala:50)
at com.databricks.sql.fileNotification.autoIngest.CloudFilesSourceProvider.$anonfun$createSource$1(CloudFilesSourceProvider.scala:130)
at scala.Option.getOrElse(Option.scala:189)
at com.databricks.sql.fileNotification.autoIngest.CloudFilesSourceProvider.createSource(CloudFilesSourceProvider.scala:115)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:332)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:209)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:206)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:204)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:477)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:477)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:343)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:339)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:453)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:421)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:204)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:374)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:374)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.initSources(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:429)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:398)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:72)
at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:172)
at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612)
at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491)
at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489)
at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:84)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:378)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:282)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:282)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:281)
Caused by: java.net.UnknownHostException: <my_storage_name>.queue.core.windows.net
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:613)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:293)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:264)
at sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:367)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:203)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1167)
at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1051)
at sun.net.www.protocol.http.HttpURLConnection$6.run(HttpURLConnection.java:1049)
at java.security.AccessController.doPrivileged(Native Method)
at java.security.AccessController.doPrivilegedWithCombiner(AccessController.java:784)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1048)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:189)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1577)
at sun.net.www.protocol.http.HttpURLConnection.access$200(HttpURLConnection.java:97)
at sun.net.www.protocol.http.HttpURLConnection$9.run(HttpURLConnection.java:1497)
at sun.net.www.protocol.http.HttpURLConnection$9.run(HttpURLConnection.java:1495)
at java.security.AccessController.doPrivileged(Native Method)
at java.security.AccessController.doPrivilegedWithCombiner(AccessController.java:784)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1494)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:115)
... 64 more
โ05-06-2024 02:02 AM
Hi @Ulman ,
i think that by default this method will try to create Event Grid and Storage Queue on the same Storage Account as your data.
Please not that PREMIUM Blob Storage do not have QUEUE service.
In my opinion the easiest way would be to create manually queue and event grid and then point your code to use existing queue (and make this queue on regular storage account, not premium)
If you need more help on that approach, let me know ๐
โ06-04-2024 04:40 AM
Hello @Wojciech_BUK
I can't get it to work, with the all required permissions to the resource group and storage (even Contributor perms), it doesn't create the resources, always returns 'not authorized to perform this operation'. Tried to build dedicated python resource manager, doesn't work.
How can I pass the created instances of DataGrid and Storage Queue? I see the option ''cloudFiles.queueName', which is the Storage Queue name (I suppose). How can I pass the DataGrid name?
โ06-04-2024 07:04 AM
@v01d you can create those services by yourself from Azure Portal (or ARM or Powershell)
1. Create Storage Account e.g. "queue-receiver-storage"
2. In this Storage Create Queue e.g. "my-queue"
3. Go to your Premium Storage where your data lands and create Event Grid System Topic
(you will have to select appropriate action - when to trigger the Event Grid and point it to my-queue
4. Once this is done - test it - load file to Storage and check if there is anything in my-queue
5. then go to your Databricks Notebook and configure cloudFiles.connectionString to my-queue (e.g. conn string with SAS token).
Now run your Streming Nootebook - it should go to queue, pick list of files that has arrived to queue, go to storage account with your data and pick files listed in queue.
โ06-04-2024 07:47 AM - edited โ06-04-2024 07:48 AM
hi @Wojciech_BUK
I have a regular, not premium sa. When I create the DataGrid I don't see the option where I should set the queue name. Once I created the DataGrid, I can add the DataGrid Event Subscription (on the specific event type?) and then set the queue as an end-point. Is this a right way?
โ06-06-2024 10:12 AM - edited โ06-06-2024 10:25 AM
@Wojciech_BUKI found the issue, to create a queue the additional private-endpoint needed. This PE should have the sub-resource type queue. After creating this PE, I see the new queue in the storage account. But after it still have an error about EventGrid creation attempts, seems like more networking resources needed for this mode. The documentation of Auto Loader Notification mode should be completed with networking requirements I think.
โ12-05-2024 02:37 PM
I agree with you. After few weeks of back and forth with Databricks Support, we ultimately opened a Microsoft support ticket and in less than a day, our problem is solved by creating additional private endpoint for queue. By default, the private end points were available for dfs, blob and web e.g.<my_storage_name>.blob.core.windows.net.
There is no need of connection string or SAS key in autoloader parameters, if your service principal has appropriate permission configured as per Databricks documentation. However, documentation lacks additional PE configuration.
โ08-18-2024 02:56 PM
You should also reevaluate your use of premium storage for your landing area files. Typically, storage for raw files does not need to be the fastest and most resilient and expensive tier. Unless you have a compelling reason for premium storage for landing, I would suggest switching to standard. Ingest job overall performance should be impacted, neither should your write operations unless you are processing an extreme amount of data (Tens of terabytes) with multiple producer applications.
Just a suggestion.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group