cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

spark-streaming read from specific event hub partition

Sandesh87
New Contributor III

The azure event hub "my_event_hub" has a total of 5 partitions ("0", "1", "2", "3", "4")

The readstream should only read events from partitions "0" and "4"

event hub configuration as streaming source:-

val name = "my_event_hub"
val connectionString = "my_event_hub_connection_string"
val max_events = 50
 
 
val positions = Map(
  new NameAndPartition(name, 0) -> EventPosition.fromEndOfStream,
  new NameAndPartition(name, 4) -> EventPosition.fromEndOfStream
)
 
val eventHubsConf = EventHubsConf(connectionString)
                    .setStartingPositions(start)
                    .setMaxEventsPerTrigger(max_Events)

official doc for structured-streaming-eventhubs-integation: https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-inte...

Using the above configuration the streaming application reads from all 5 partitions of the event hub. Can we read from specific partitions only?

For example read events only from 2 partitions "0" and "4" with the checkpoint and offsets pointed to the specific partitions.

4 REPLIES 4

UmaMahesh1
Honored Contributor III

Hi @Sandesh Puligundla​ 

For those specific partitions, can you try give start and end positions as a single value or a non existent number and see if it excludes that ?

Uma Mahesh D

Sandesh87
New Contributor III

@Uma Maheswara Rao Desula​ Do you mean creating a config map such that that the offset (event position) of the start and end event positions on the partitions are the same?

Example1: In both starting and ending positions:-

EventPosition.fromOffset("1")

Example1: In both starting and ending positions:-

EventPosition.fromSequenceNumber(100L)

Theoretically having this kind of a configuration map where start and ending positions have same offset or sequence number, we can control how many records are ingested by each partition of the event hub.

vivek_rawat
New Contributor III

Hi @Sandesh Puligundla​ ,

The approach you have written is the only approach to read the specific partitions. The reason for this incorrect behavior is a typo error, I think you should use the positions variable instead of start.

val positions = Map(
  new NameAndPartition(name, 0) -> EventPosition.fromEndOfStream,
  new NameAndPartition(name, 4) -> EventPosition.fromEndOfStream
)
 
val eventHubsConf = EventHubsConf(connectionString)
                    .setStartingPositions(positions)
                    .setMaxEventsPerTrigger(max_Events)

keshav
New Contributor II

I tried using below snippet to receive messages only from partition id=0

ehName = "<<EVENT-HUB-NAME>>"
 
# Create event position for partition 0
positionKey1 = {
  "ehName": ehName,
  "partitionId": 0
}
 
eventPosition1 = {
  "offset": "@latest",    
  "seqNo": -1,            
  "enqueuedTime": None,   
  "isInclusive": True
}
 
# Put the rules into a map. The position key dictionaries must be made into JSON strings to act as the key.
positionMap = {
  json.dumps(positionKey1) : eventPosition1
}
 
# Place the map into the main Event Hub config dictionary
ehConf["eventhubs.startingPositions"] = json.dumps(positionMap)
ehConf["eventhubs.connectionString"] = <eventhub_connection_string>
ehConf["eventhubs.consumerGroup"] = <eventhub_consumer_group>
 
df = spark \
    .readStream \
    .format("eventhubs") \
    .schema(jsonSchema) \
    .options(**conf) \
    .load()

But still i receive messages from other partitions as well.

Can anyone guide where am i missing ?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group