cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Adding a message to azure service bus

Sega2
New Contributor III

I am trying to send a message to a service bus in azure. But I get following error:
ServiceBusError: Handler failed: DefaultAzureCredential failed to retrieve a token from the included credentials.

This is the line that fails: credential = DefaultAzureCredential()

Anyone has a sample on how to add a message to a servicebus or similar?

Notebook:

import nest_asyncio
import asyncio
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient

from azure.identity.aio import DefaultAzureCredential

nest_asyncio.apply()

local_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()

FULLY_QUALIFIED_NAMESPACE = "xxx.servicebus.windows.net"
TOPIC_NAME = "xxoutbound"

credential = DefaultAzureCredential()
token = credential.get_token('xxx')
print(token)

async def send_single_message(sender):
message = ServiceBusMessage("Single Message")
await sender.send_messages(message)
print("Sent a single message")

async def run():
async with ServiceBusClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
credential=credential,
logging_enable=True) as servicebus_client:
sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
async with sender:
await send_single_message(sender)

await credential.close()

asyncio.run(run())

2 REPLIES 2

Panda
Valued Contributor

@Sega2 - Can you explicitly useClientSecretCredential and try

stbjelcevic
Databricks Employee
Databricks Employee

It looks like the issue is with the Azure credential chain rather than Service Bus itself; in Databricks notebooks, DefaultAzureCredential wonโ€™t succeed unless thereโ€™s a valid identity available (env vars, CLI login, managed identity, or a Databricks โ€œservice credentialโ€ in Unity Catalog).

Why DefaultAzureCredential fails in Databricks

  • No Azure identity present on the cluster: DefaultAzureCredential tries environment variables, then managed identity, then Azure CLI, etc.; without one configured, token acquisition fails immediately.

  • In Databricks, the recommended way to call Azure SDKs from notebooks is to use Unity Catalog Service Credentials (which wrap a managed identity or service principal) and pass that to the Azure SDK client, rather than instantiating DefaultAzureCredential directly.

Two working approaches

1) Use a Unity Catalog Service Credential (recommended)

This avoids storing secrets and works well from Databricks notebooks.

High-level steps:

  • Create a Service Credential in Unity Catalog backed by an Azure managed identity via the Databricks Access Connector, and grant it rights to your Service Bus namespace/topic (Send permission on the topic or namespace).

  • In your notebook, retrieve the credential with dbutils.credentials.getServiceCredentialsProvider('<service-credential-name>') and pass it to the Service Bus client.

Example (async):

%pip install azure-servicebus==7.12.1 azure-identity==1.17.0
python
 
import nest_asyncio, asyncio
nest_asyncio.apply()

from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient
# IMPORTANT: use UC Service Credential provider, not DefaultAzureCredential
credential = dbutils.credentials.getServiceCredentialsProvider("my-service-credential")  # name in UC

FULLY_QUALIFIED_NAMESPACE = "xxx.servicebus.windows.net"
TOPIC_NAME = "xxoutbound"

async def send_single_message(sender):
    message = ServiceBusMessage("Single Message")
    await sender.send_messages(message)
    print("Sent a single message")

async def run():
    async with ServiceBusClient(
        fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
        credential=credential,
        logging_enable=True,
    ) as servicebus_client:
        sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
        async with sender:
            await send_single_message(sender)

asyncio.run(run())

Notes:

  • The UC service credential object must have permission to send to the topic; assign RBAC (e.g., Azure Service Bus โ€œData Senderโ€ role) to the managed identity attached to the Access Connector on the Service Bus resource.

  • If you prefer setting a default service credential for the cluster, you can set DATABRICKS_DEFAULT_SERVICE_CREDENTIAL_NAME as an environment variable on the cluster and then use SDK defaults, but naming it explicitly in code is more portable.

2) Use DefaultAzureCredential with an actual identity source

If you still want DefaultAzureCredential, ensure one of its sources is available on the cluster:

Options:

  • Configure environment variables for a service principal: AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET on the cluster, and grant that SP sender permissions on Service Bus.

  • Attach a managed identity to the Databricks Access Connector and let DefaultAzureCredential pick it up if you also set it as the default service credential for the cluster. In Databricks classic compute, you can set DATABRICKS_DEFAULT_SERVICE_CREDENTIAL_NAME, then simply use the Azure SDK without passing a credential explicitly.

Example (DefaultAzureCredential after setting identity):

from azure.identity.aio import DefaultAzureCredential
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
import asyncio, nest_asyncio
nest_asyncio.apply()

FULLY_QUALIFIED_NAMESPACE = "xxx.servicebus.windows.net"
TOPIC_NAME = "xxoutbound"

credential = DefaultAzureCredential()

async def run():
    async with ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential=credential, logging_enable=True) as sb:
        sender = sb.get_topic_sender(topic_name=TOPIC_NAME)
        async with sender:
            await sender.send_messages(ServiceBusMessage("hello"))

    await credential.close()

asyncio.run(run())

Common pitfall in your snippet:

  • get_token('xxx') is not a valid scope for Azure AD; when testing, use a real scope (for Service Bus you typically let the client fetch tokens internally, you donโ€™t need to call get_token yourself). If you do test, use a resource scope like "https://servicebus.azure.net/.default" or the Azure SDK will handle it automatically when you send messages. But with UC service credentials, you usually donโ€™t call get_token directly at all.

Quick checklist to fix your notebook

  • Decide on an auth method:

    • Prefer UC Service Credential and use dbutils.credentials.getServiceCredentialsProvider("name") in code.

    • Or supply SP env vars or managed identity so DefaultAzureCredential has something to use.

  • Grant the chosen identity โ€œSendโ€ rights on the Service Bus topic/namespace.

  • Remove manual get_token('xxx') call; let the Service Bus SDK obtain tokens using the credential you pass.

  • Keep the async pattern you have; itโ€™s correct for azure.servicebus.aio.