cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to stream data from azure event hub to databricks delta table

gayatrikhatale
Contributor

Hi,
I want to stream data from azure event hub to databricks table.

But I want to use service principal details for that not event hub connection string.

Can anyone please share the code snippet?

Thank you!

2 ACCEPTED SOLUTIONS

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @gayatrikhatale ,

Unity Catalog now supports Service Credentials, so you should use that way of authentication. 
Service credentials allow the generation of short-lived authentication tokens and connect to different Azure services without requiring passwords or other long-lived credentials. And they are managed by Unity Catalog, so you can limit who can use them, or allow their usage only from specific workspaces 

And good news is that Unity Catalog service credentials support Azure Event Hub:

Stream processing with Apache Kafka and Azure Databricks - Azure Databricks | Microsoft Learn

So, how to leverage this?

  1. Create UC Service Credential if you donโ€™t have one.
  2. Assign necessary roles to it on Event Hubs (i.e., Azure Event Hubs Data receiver, Azure Event Hubs Data sender, etc.)
  3. Specify the service credential name in the databricks.serviceCredential option when reading or writing data.

 

credential_name = "service-credential"
eh_server = "<host>.servicebus.windows.net:9093"

eh_opts = {
    "databricks.serviceCredential": credential_name,
    "kafka.bootstrap.servers": eh_server,
    "subscribe": "iocs",
    "startingOffsets": "earliest"
}
df = spark.readStream.format("kafka").options(**eh_opts).load()
display(df.selectExpr("CAST(value AS STRING) as value"))

 

 

View solution in original post

gayatrikhatale
Contributor

Thank you @szymon_dybczak . It's working for me.

I have also found one more way to do same thing. Below is the code snippet:

from azure.identity import DefaultAzureCredential
  from azure.eventhub import EventHubConsumerClient
  # Replace with your Event Hub details
  event_hub_namespace = "Your-EventHub-Namespace.servicebus.windows.net"
  event_hub_name = "Your-EventHub-Name"
  consumer_group = "$Default"
  # Use Managed Identity for authentication
  credential = DefaultAzureCredential()
  client = EventHubConsumerClient(
      fully_qualified_namespace=event_hub_namespace,
      eventhub_name=event_hub_name,
      consumer_group=consumer_group,
      credential=credential
  )
  def on_event(partition_context, event):
      print("Received event: {}".format(event.body_as_str()))
      partition_context.update_checkpoint(event)
  with client:
      client.receive(on_event=on_event)

View solution in original post

3 REPLIES 3

szymon_dybczak
Esteemed Contributor III

Hi @gayatrikhatale ,

Unity Catalog now supports Service Credentials, so you should use that way of authentication. 
Service credentials allow the generation of short-lived authentication tokens and connect to different Azure services without requiring passwords or other long-lived credentials. And they are managed by Unity Catalog, so you can limit who can use them, or allow their usage only from specific workspaces 

And good news is that Unity Catalog service credentials support Azure Event Hub:

Stream processing with Apache Kafka and Azure Databricks - Azure Databricks | Microsoft Learn

So, how to leverage this?

  1. Create UC Service Credential if you donโ€™t have one.
  2. Assign necessary roles to it on Event Hubs (i.e., Azure Event Hubs Data receiver, Azure Event Hubs Data sender, etc.)
  3. Specify the service credential name in the databricks.serviceCredential option when reading or writing data.

 

credential_name = "service-credential"
eh_server = "<host>.servicebus.windows.net:9093"

eh_opts = {
    "databricks.serviceCredential": credential_name,
    "kafka.bootstrap.servers": eh_server,
    "subscribe": "iocs",
    "startingOffsets": "earliest"
}
df = spark.readStream.format("kafka").options(**eh_opts).load()
display(df.selectExpr("CAST(value AS STRING) as value"))

 

 

gayatrikhatale
Contributor

Thank you @szymon_dybczak . It's working for me.

I have also found one more way to do same thing. Below is the code snippet:

from azure.identity import DefaultAzureCredential
  from azure.eventhub import EventHubConsumerClient
  # Replace with your Event Hub details
  event_hub_namespace = "Your-EventHub-Namespace.servicebus.windows.net"
  event_hub_name = "Your-EventHub-Name"
  consumer_group = "$Default"
  # Use Managed Identity for authentication
  credential = DefaultAzureCredential()
  client = EventHubConsumerClient(
      fully_qualified_namespace=event_hub_namespace,
      eventhub_name=event_hub_name,
      consumer_group=consumer_group,
      credential=credential
  )
  def on_event(partition_context, event):
      print("Received event: {}".format(event.body_as_str()))
      partition_context.update_checkpoint(event)
  with client:
      client.receive(on_event=on_event)

Hi @gayatrikhatale ,

Great, thanks for sharing alternative snippet. Also, if the answer was helpful to you please consider marking it as a solution. This way we help others find answer for similar question in faster way ๐Ÿ™‚

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now