cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

spark-streaming read from specific event hub partition

Sandesh87
New Contributor III

The azure event hub "my_event_hub" has a total of 5 partitions ("0", "1", "2", "3", "4")

The readstream should only read events from partitions "0" and "4"

event hub configuration as streaming source:-

val name = "my_event_hub"
val connectionString = "my_event_hub_connection_string"
val max_events = 50
 
 
val positions = Map(
  new NameAndPartition(name, 0) -> EventPosition.fromEndOfStream,
  new NameAndPartition(name, 4) -> EventPosition.fromEndOfStream
)
 
val eventHubsConf = EventHubsConf(connectionString)
                    .setStartingPositions(start)
                    .setMaxEventsPerTrigger(max_Events)

official doc for structured-streaming-eventhubs-integation: https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-inte...

Using the above configuration the streaming application reads from all 5 partitions of the event hub. Can we read from specific partitions only?

For example read events only from 2 partitions "0" and "4" with the checkpoint and offsets pointed to the specific partitions.

4 REPLIES 4

UmaMahesh1
Honored Contributor III

Hi @Sandesh Puligundla​ 

For those specific partitions, can you try give start and end positions as a single value or a non existent number and see if it excludes that ?

Sandesh87
New Contributor III

@Uma Maheswara Rao Desula​ Do you mean creating a config map such that that the offset (event position) of the start and end event positions on the partitions are the same?

Example1: In both starting and ending positions:-

EventPosition.fromOffset("1")

Example1: In both starting and ending positions:-

EventPosition.fromSequenceNumber(100L)

Theoretically having this kind of a configuration map where start and ending positions have same offset or sequence number, we can control how many records are ingested by each partition of the event hub.

vivek_rawat
New Contributor III

Hi @Sandesh Puligundla​ ,

The approach you have written is the only approach to read the specific partitions. The reason for this incorrect behavior is a typo error, I think you should use the positions variable instead of start.

val positions = Map(
  new NameAndPartition(name, 0) -> EventPosition.fromEndOfStream,
  new NameAndPartition(name, 4) -> EventPosition.fromEndOfStream
)
 
val eventHubsConf = EventHubsConf(connectionString)
                    .setStartingPositions(positions)
                    .setMaxEventsPerTrigger(max_Events)

keshav
New Contributor II

I tried using below snippet to receive messages only from partition id=0

ehName = "<<EVENT-HUB-NAME>>"
 
# Create event position for partition 0
positionKey1 = {
  "ehName": ehName,
  "partitionId": 0
}
 
eventPosition1 = {
  "offset": "@latest",    
  "seqNo": -1,            
  "enqueuedTime": None,   
  "isInclusive": True
}
 
# Put the rules into a map. The position key dictionaries must be made into JSON strings to act as the key.
positionMap = {
  json.dumps(positionKey1) : eventPosition1
}
 
# Place the map into the main Event Hub config dictionary
ehConf["eventhubs.startingPositions"] = json.dumps(positionMap)
ehConf["eventhubs.connectionString"] = <eventhub_connection_string>
ehConf["eventhubs.consumerGroup"] = <eventhub_consumer_group>
 
df = spark \
    .readStream \
    .format("eventhubs") \
    .schema(jsonSchema) \
    .options(**conf) \
    .load()

But still i receive messages from other partitions as well.

Can anyone guide where am i missing ?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.