cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
PeteStern
Databricks Employee
Databricks Employee

EventHubs to Databricks (1).png

by Peter Stern & Sanja Sandic

A couple of years ago, we, two Solutions Architects at Databricks, were working with a customer to maximize the performance when reading from Azure Event Hubs into Databricks.  They had 360 million events coming in every hour across 14 Event Hubs and were running a 160-core cluster 24x7 just to keep up with the load.  Needless to say, this was far too expensive.

Unfortunately, we found there wasn’t a great single resource out there to figure out what could be going wrong, so we did hours of research reading Azure and Spark documentation, asking colleagues, and running our own experiments. We managed to reduce their cluster size to only 16 cores while comfortably keeping up with their Event Hubs, a cost reduction of 90%!

This article is a summing up of our findings.  We'll walk you through scaling concepts in Event Hubs, how to tune Event Hubs and Spark, and examine some interlinked behaviours between Spark and Event Hubs you may not be aware of.  We’ve also provided plenty of links where we think you might find valuable information.  Hopefully, this makes your journey with both Event Hubs and Spark just a little bit better.

 

Important Event Hubs Scaling Concepts

This section gives an overview of the Event Hubs concepts you will need to understand in order to get great performance from Event Hubs.

You can choose different pricing tiers for your Event Hubs. Each tier will have different options and limits for partitioning or throughput capacity.  For simplicity, we will focus on the standard tier but a full list of tiers and their capabilities and limits can be found in the docs

Partitioning

Partitioning is the main method for how Event Hubs scales.  By default, each partition will be read by one Spark task, which we’ll discuss further in the “Implementing for Performance” section.

The number of partitions is set by configuration at the time the Event Hub is created (note that some tiers allow increasing the partitions at a later point without the ability to decrease them). Our experiments show that Spark can read up to 10,000 events per second from each Event Hub partition.  So to get more throughput, one of the main levers is to increase the number of partitions, but be careful not to go too high.  If each partition is only getting a little bit of traffic (<100 events per second) it’s just extra work for Spark to fetch the data.  So it’s a balancing act.  The number of partitions you can configure depends on which pricing tier you are using.  For example, the standard tier allows for a maximum of 32.

It’s also important to know that ordering in Event Hub is only guaranteed within a single partition. If order is important for your use case you need to either specify a single partition or map events to partitions.  You can set a key that will be used to map events to partitions, giving considerable control.  If you do change the default partition mapping, you should be careful about the potential for skew, which can cause its own problems.

PeteStern_0-1729522167427.pngOverview of Azure Event Hub with Databricks

Throughput Capacity

There are different types of charging/throttling mechanisms for Event Hubs depending on the pricing tier. The standard tier utilized Throughput Units (TU), but all pricing tiers have some equivalent.

Throughput Units are configured at the Namespace level. A Namespace is just a collection of Event Hubs. If you put multiple Event Hubs into the same Namespace, they will all share the same Throughput Units. If you need more TUs per Event Hub then simply put them into separate Namespaces. 

A single Throughput Unit lets you:

Ingress: Up to 1 MB per second or 1000 events per second (whichever comes first).

Egress: Up to 2 MB per second or 4096 events per second.

The owner of an Event Hub namespace can set the Throughput Units from 1 to 40.  The higher the TU, the higher the resulting Ingress / Egress is. If an Event Hub namespace surpasses the allowed throughput, it will be throttled.

PeteStern_1-1729521696629.png

The simplest solution is to set the TUs to auto-inflate, which means the Event Hub namespace will increase the number of Throughput Units automatically to match your throughput needs.  However, it will not automatically bring the TUs back down if the traffic decreases.  Bringing the TUs back down requires manual intervention.

Designing for Performance

This section provides various approaches to architecting your Event Hubs and Spark setup, depending on your goals.  Find the section(s) that matches your goals and situation.

Performance Definitions

First, here are some basic definitions:

Throughput: Amount of data that can be processed in a given amount of time

Latency: Time it takes for a row of data to make it through the pipeline after it arrives in the Event Hub

Cost: $

Maximizing Throughput

For maximum throughput, read the data from files in ADLS instead of directly from Event Hubs.  Event Hubs Capture will write the data from Event Hubs to ADLS for you, and Spark can read the data from there.  This approach is efficient and can be cost-effective. See the section “Micro-batches = bursty request pattern” for why this approach works so well with Spark streaming.  

Event Hubs Capture allows you to set triggers by time and data size:

PeteStern_2-1729521696660.png

Once the data is in ADLS, you’re golden.  Spark can easily read millions of events per second from files in ADLS, so this should resolve any throughput or scaling problems you may face when reading from Event Hub.  It’s also super simple and super scalable. However, keep in mind the additional storage and capture cost of this approach.  Consider pairing Event Hubs Capture with Autoloader.

To keep costs low, you can schedule a job to run once per day or once per hour to process the files.  This means you don’t need to have a Spark cluster running 24x7 to capture the data, lowering computation costs significantly.  The downside of this approach is that latencies will be on the order of hours since your cluster is not always running, but if you don’t have low latency requirements, this can be an excellent solution.

If you have a latency requirement of less than five minutes, see the next section.

Minimizing Latency

If a < 15-minute latency is important to you, you will need to have a Spark cluster streaming from Event Hubs at all times.  Depending on your throughput requirements, you may need to scale up your Event Hub by increasing partitions, Throughput Units, and upgrading the Event Hub Tier.  You may also need to increase the size of your Spark cluster.  Implementation details will be important to keep costs reasonable and latencies low.  See the section “Implementing for Performance”. 

Another trick to keeping latencies low, if you want to shave seconds off, is to set the maxOffsetsPerTrigger as low as possible.  See the “maxOffsetsPerTrigger” section for the nuances of this important configuration.  

Minimizing Cost

The solution in the “Maximizing Throughput” section is also a good solution for minimizing cost, but the tradeoff for this solution is longer latencies.  If you have low latency requirements as well, the key to keeping costs low will be to utilize the smallest Spark cluster and the fewest Event Hub partitions possible.  See the section “Implementing for Performance” to learn how to best optimize Spark’s connection to Event Hubs.

If your EventHub is low throughput (<10 events per second), you may find that reading directly from Event Hub using the AvailableNow trigger will be the most cost-effective approach.

Implementing for Performance

This section addresses various implementation details you will have to be aware of to achieve the best performance reading Event Hubs from Spark.

Connecting to Event Hubs

Event Hubs supports multiple protocols, including the Kafka protocol.  This means that you can use either the Event Hubs connector or the Kafka connector that’s included with Spark to connect to Event Hubs.  Unfortunately as of the publishing of this blog, the Event Hubs Connector has not had a new release in two years.  The good news is that you can simply use the Kafka connector instead.  The Kafka connector is also quite a bit faster and more stable than the EventHubs connector.  Our experiments have shown that you can get ~5,000 events per second from each Event Hub partition using the Event Hubs connector but ~10,000 events per second using the Kafka connector.  Given the lack of maintenance and poorer performance of the Event Hubs connector, we suggest skipping it and using the Kafka connector instead.

Use the Spark Kafka Integration Guide or Databricks Kafka Integration Guide depending on your platform to get started with the Kafka connector.  A few terms are different between Kafka and Event Hubs, but Microsoft has put out a handy guide to translate for you.  Here are some other resources that you may find handy:

Setting the Right Number of Event Hub Partitions

You may find your throughput constrained If you don’t have enough Event Hub partitions.  In our experiments, we were able to get 10k events per second from each partition, but the throughput you get will depend on event size and TU limits.  We tested with a message size of < 1KB.  Remember not to set your number of partitions too high, or you will end up creating a lot of extra work for Spark that will slow down your stream.

'maxOffsetsPerTrigger' - The secret sauce for high performance

maxOffsetsPerTrigger is the Spark parameter that determines how many events will be read in each micro-batch. It is set in the following manner:

 

df =  (spark
        .readStream
        .format("kafka")
        .option("maxOffsetsPerTrigger", 8000) # <----
        ...
      )

 

In this case, Spark will read 8000 events from Event Hub, process them, then grab another batch of 8000 events, etc.  If an Event Hub has multiple partitions, Spark will create one task per partition and read maxOffsetsPerTrigger / numPartitions events from each partition.  For example, if this Event Hub has 10 partitions, we will read 800 events from each partition, which is quite low.

So this turns out to be a really important configuration.  Set it too low, and you will get lower throughput because you’re reading tiny batches from Event Hubs.  Set it too high, and you will get throttled.  When you set maxOffsetsPerTrigger, keep in mind that Event Hubs can serve 5-10,000 events per partition.  Experimentally we found that the sweet spot for maxOffsetsPerTrigger is 10-20,000 x numPartitions, but you will need to experiment with this yourself as it will vary per use case.

Also, the lower the maxOffsetsPerTrigger, the lower the latency.  However, the lower maxOffsetsPerTrigger, the lower the throughput is as well.  So if you set maxOffsetsPerTrigger too low, your processing will fall behind and latencies will rise.  It’s a balancing act.  When in doubt, set the maxOffsetsPerTrigger higher rather than lower so you can handle bursts of data.

PeteStern_3-1729521696643.png

Setting the Number of Spark Tasks

One quirk is that by default, one Spark task will be assigned to each Event Hub partition per micro-batch.  This means that there can be a lot of overhead when reading from multiple Event Hub partitions.  Thirty Event Hub partitions will get 30 Spark tasks, which is probably more than you need.  If you have more Event Hub partitions than CPU cores on your Spark cluster, or you want to save cost by decreasing cores, you can: 

  • Decrease the number of Event Hub partitions
  • Coalesce Spark tasks.
    • df.coalesce(<num cores>)
    • This will result in each Spark task reading multiple Event Hub partitions
  • Increase the number of cores per worker using the SPARK_WORKER_CORES env variable

There are also some cases where you need more horsepower.  You can have multiple tasks read a single partition with the minPartitions setting.  If you’d like each partition to be read by two tasks, set minPartitions to 2 x <number of Event Hub partitions>

Micro-batches = bursty request pattern

Event Hub works best with a constant request pattern.  However, Spark reads tend to be bursty due to its micro-batch architecture.  For each micro-batch, Spark will:

  1. Read data from Event Hub
  2. Process data
  3. Write output
  4. Update streaming state

While Spark is processing, writing, and updating state, it is not reading anything from Event Hub.  So you end up with a request pattern like this:

PeteStern_4-1729521696600.png

 All that time with 0 throughput has an impact on the overall throughput.  The more time you spend processing and managing the state, the more time is spent not reading from Event Hub.  

Optimizing Your Micro-Batch Stream

Aggressively optimizing your Spark stream can make a huge difference to the stream’s throughput, cost, and latency when reading from EventHubs. See the following links for optimization tips:

Enabling Changelog Checkpointing and Async Checkpointing in Spark can help decrease the overhead associated with updating the streaming state.  Another option is to read from files instead, which doesn’t suffer from the burstiness problem, but that will increase your latency. 

Stream Pipelining

Stream pipelining is available in serverless Delta Live Tables and Serverless Workflows and is automatically enabled in these environments.  Stream pipelining parallelizes the reading and processing of data by running five micro-batches in parallel (at the time of publication), decreasing the burstiness problem.

If using pipelining you may want to set a higher maxOffsetsPerTrigger to maximize throughput.

The future: Real-Time Streaming Mode

In the long-run Databricks will implement a real-time streaming mode, which will do away with micro-batches and fully solve this problem.  Real-time streaming mode was announced by Databricks at the 2024 Data & AI Summit, but it was still in development when this blog was written.  Watch the presentation here and keep your eyes open for this feature.  By the time you’re reading this, maybe it will already be available!  Speak to your Databricks account team to get enrolled in the preview if it’s not yet GA.

Other Issues

Not all partitions have the same performance.  The nature of the cloud environment means that some partitions could be on hardware that has more load or there could be network issues, etc.  There could also be skew, so a few partitions end up getting more data than the others. Unfortunately, each micro-batch will read the prescribed number of events (set by maxOffsetsPerTrigger) before the batch can be completed.  This means that a single slow partition can slow down the entire job.  You should be able to see this in the Stage Timeline in the Spark UI.  If some tasks are taking a lot longer than others, you may be facing a slow partition.  It also shows up in the executor logs. If you are facing this problem, you may need to reconsider how events are getting mapped to your Event Hub partitions.  Otherwise, there’s nothing more to do than debug why your partition is running slowly.

Other troubleshooting can be done with the Spark UI. Here’s a walkthrough of how to use the Spark UI to troubleshoot performance issues.

Conclusion

We’ve laid out how EventHubs scales, architectural approaches to maximizing throughput, minimizing latency, and minimizing cost, and implementation details for achieving high performance.  We’ve also seen that optimizing streaming use cases with Event Hub and Spark can be done by either scaling the Event Hub or Spark side. 

PeteStern_5-1729521697088.pngOverview of Event Hubs and Performance Optimizations

On the Event Hub side, we can work with either the sizing of the Event Hub or its underlying partitioning design, as bigger Event Hub sizes will allow more throughput. 

On Spark’s side, we found out that Spark’s parameter maxOffsetsPerTrigger impacts both latency and throughput. Tuning the number of tasks can also improve the overhead on the Spark side as multiple tasks per partition can be combined into one task. Lastly, optimizing your Micro-Batches either by enabling batch-processing configurations, changing to stream pipelining on e.g. serverless compute, real-time streaming mode or just reading the data from files can improve the overall performance. 

The table below gives a summary of the proposed concepts. Please refer to each section for a detailed explanation. 

Hopefully, you’ve found this guide helpful in your EventHubs and Spark journey! 

 

Feature

Conceptual impact

Design Considerations

Implementation comments

Spark Optimization

Cost, throughput, latency

A well-optimized Spark job will be more throughput, lower latency, and lower cost.

See the following guides:

Event Hub partitions

Cost, throughput, latency

Select number of Event Hub partitions based on rate of events / sec.  We were able to read 10k events per second per partition, but your rate will depend on your event size.

Be aware of your Event Hub Tier.  Some tiers don’t allow resizing, so you’ll have to create a new Event Hub.  Other tiers allow only increasing number of partitions, not decreasing.

Event Hub throttling

Cost, throughput, latency 

Different Event Hub tiers have different capacities you can configure.  For example, the Standard tier has Throughput Units (TUs).  The more TUs you pay for, the more throughput you get before being throttled.  This is a complex topic since each EH Tier has a different model.  Refer to Azure docs.

 

Spark tuning with maxOffsetsPerTrigger

Throughput, latency 

- The lower the maxOffsetsPerTrigger the lower the latency, but you’ll get less throughput

- The higher the maxOffsetsPerTrigger the higher the throughput, until you get throttled by Event Hubs

Start with a higher maxOffsetsPerTrigger than you think you need to start and reduce if the latency is too high.  Higher settings for maxOffsetsPerTrigger are less likely to fall behind the EH.

Tuning number of spark tasks

drives cluster size requirements

Having the right number of Spark tasks for each batch is important for performance.  Too many tasks, and there will be overhead.  Too few and you will not have enough horsepower.

Tied to partitioning: By default 1 task per partition.  Reduce the number of tasks using .coalesce(<num cores>).  If you think you need more tasks, you can use the minPartitions setting

Optimize Spark

Throughput, cost, latency

- Use out-of-the-box Spark configurations to decrease overhead for streaming state

- Consider using new features which are available on DLT and serverless pipelines

See the following guides:

Features/Techniques:

1 Comment
radothede
Contributor II

Great job, thanks for sharing!

Some small hints from my end:

  • if You also care about the latency of processing bronze to silver using structured streaming setting the parameter maxFilesPerTrigger will be important.
  • start with small cluster setup (I've used DS3_v2 single node for kafka to bronze for each topic using kafka connector), if needed scale vertically.
  • consider setting the retries to -1, jobs will fail once in a while and You propably want them to restart automatically.

Best!