cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to Stream Azure event hub to databricks delta table

Areqio
New Contributor II

I am trying to stream my IoT data from azure event hub to databricks. Im running Databricks runtime 17.3 LTS with scala 2.13. 

3 REPLIES 3

balajij8
Contributor III

Hi @Areqio 

You can use Lakeflow Declarative Pipelines to stream Azure Event Hub IoT data into Databricks delta tables. Lakeflow Spark Declarative Pipelines extends functionality in Spark Structured Streaming and allows you to write just a few lines of declarative Python or SQL to create robust pipelines.

More details here

Louis_Frolio
Databricks Employee
Databricks Employee

Hi @Areqio,

+1 to what @balajij8 suggested about using Lakeflow Declarative Pipelines as the simplest, supported way to land Azure Event Hubs IoT data into Delta. Lakeflow Spark declarative pipelines are built on top of Structured Streaming, so you get robust pipeline orchestration with only a small amount of Python/SQL instead of wiring everything manually in Scala.

A couple of concrete points from the official docs on โ€œUse Azure Event Hubs as a pipeline data sourceโ€ that are worth calling out:

  1. Use the Kafka endpoint, not the old Event Hubs connector

    • The classic azure-event-hubs-spark connector is not available in Databricks Runtime for Lakeflow, and Lakeflow pipelines donโ€™t allow adding third-party JVM libraries.
    • Instead, Event Hubs exposes an Apache Kafkaโ€“compatible endpoint that you read with the built-in Structured Streaming Kafka connector thatโ€™s already in the runtime.
  2. Authenticate with SAS via secrets

    • Event Hubs gives you a namespace, hub name, and a shared access policy (name + key); the docs recommend putting the key into a Databricks secret scope (via CLI) and reading it in the pipeline rather than hard-coding it.
    • The example builds a connection string like:
      Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={POLICY_NAME};SharedAccessKey={POLICY_KEY} and then uses that in the Kafka sasl.jaas.config option.
  3. Minimal Lakeflow pipeline pattern (Python)
    The docs show a pattern roughly like this (simplified here):

    • Configure Event Hubs and Kafka options from pipeline settings (e.g. iot.ingestion.eh.namespace, iot.ingestion.eh.name, iot.ingestion.kafka.requestTimeout, etc.).
    • Store the SAS key in a secret scope, retrieve it with dbutils.secrets.get, and construct the Kafka SASL_SSL options.
    • In the pipeline code, use spark.readStream.format("kafka").options(**KAFKA_OPTIONS).load() to read from the Event Hubs topic and then parse your IoT JSON payload into a typed schema before writing to a Delta table (typically a bronze table with date partitioning on an event or enqueue timestamp).

    The official doc (AWS flavor but the Event Hubs/Kafka wiring is the same on Azure) is here:
    Use Azure Event Hubs as a pipeline data source โ€“ https://docs.databricks.com/aws/en/ldp/event-hubs

  4. Where your 17.3 LTS / Scala 2.13 fits in

    • Your compute runtime (17.3 LTS, Scala 2.13) is fine for running the Kafka structured streaming side of this; the key change is that the pipeline definition itself is usually written in Python or SQL for Lakeflow. The docs only show Python examples today; I donโ€™t know of a Scala API for Lakeflow declarative pipelines at this time.

If you prefer not to adopt Lakeflow yet, you can still follow the same Kafka configuration (using the Event Hubs Kafka endpoint + SASL_SSL) directly in a classic Structured Streaming job and then write to a Delta table from Scala.

 

Hope this helps.

 

Cheers, Lou.

rohan22sri
New Contributor III

Hi @Areqio ,

def getKafkaOptions(
  env: String,
  ehNameSpace: String,
  ehName: String,
  scopeName: String,
  kafkaOffset: String,
  ehConnKey: String,
  maxOffsetsPerTrigger: String = "50000"
): Map[String, String] = {

  val connStr = dbutils.secrets.get(scope = scopeName, key = ehConnKey)

  Map(
    "kafka.bootstrap.servers" -> s"$ehNameSpace.servicebus.windows.net:9093",
    "subscribe" -> ehName,
    "kafka.sasl.mechanism" -> "PLAIN",
    "kafka.security.protocol" -> "SASL_SSL",
    "kafka.sasl.jaas.config" ->
      s"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$$ConnectionString" password="$connStr";""",
    "startingOffsets" -> kafkaOffset,
    "failOnDataLoss" -> "false",
    "maxOffsetsPerTrigger" -> maxOffsetsPerTrigger
  )
}

val kafkaOptions = getKafkaOptions(
  env = "dev",
  ehNameSpace = "mynamespace",
  ehName = "myeventhub",
  scopeName = "my-secret-scope",
  kafkaOffset = "latest",
  ehConnKey = "eventhub-connection-string"
)

val df = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

val query = df.writeStream
  .format("delta")
  .option("checkpointLocation", "/mnt/checkpoints/eventhub")
  .start("/mnt/output/eventhub")
  
  
What This Means
getKafkaOptions(...):
Builds the configuration required to connect securely to Event Hubs using its Kafka-compatible endpoint.
dbutils.secrets.get(...):
Fetches the Event Hub connection string securely from a Databricks secret scope instead of hardcoding it.
kafka.bootstrap.servers:
Points to your Event Hub namespace Kafka endpoint.
subscribe:
The Event Hub name (treated like a Kafka topic).
SASL/SSL configs:
Required authentication mechanism for connecting to Azure Event Hubs via Kafka.
startingOffsets:
Controls where to start reading from:
"latest" โ†’ only new events
"earliest" โ†’ read from beginning
maxOffsetsPerTrigger:
Limits how much data is processed per micro-batch (helps control load).
readStream:
Creates a streaming DataFrame from Event Hubs.
writeStream:
Writes the streaming data to a Delta table (or any supported sink).

If you want to use Scala with Azure Event Hubs in Databricks Runtime 17.3 LTS (Scala 2.13), a practical approach is to use Structured Streaming via the Kafka endpoint of Event Hubs.

Attached is a reusable helper function to build Kafka options, followed by an example of how to call it and what each part means.

Additional Notes
For Scala workloads, youโ€™ll likely need classic compute, as serverless support for Scala is still limited.
If you want a more managed approach with serverless, consider using Delta Live Tables / Lakeflow (Declarative Pipelines), but those currently favor Python/SQL over Scala.

Rohan