spark-streaming read from specific event hub partition
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-08-2022 12:07 PM
The azure event hub "my_event_hub" has a total of 5 partitions ("0", "1", "2", "3", "4")
The readstream should only read events from partitions "0" and "4"
event hub configuration as streaming source:-
val name = "my_event_hub"
val connectionString = "my_event_hub_connection_string"
val max_events = 50
val positions = Map(
new NameAndPartition(name, 0) -> EventPosition.fromEndOfStream,
new NameAndPartition(name, 4) -> EventPosition.fromEndOfStream
)
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPositions(start)
.setMaxEventsPerTrigger(max_Events)
official doc for structured-streaming-eventhubs-integation: https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-inte...
Using the above configuration the streaming application reads from all 5 partitions of the event hub. Can we read from specific partitions only?
For example read events only from 2 partitions "0" and "4" with the checkpoint and offsets pointed to the specific partitions.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-08-2022 10:03 PM
Hi @Sandesh Puligundlaโ
For those specific partitions, can you try give start and end positions as a single value or a non existent number and see if it excludes that ?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-19-2022 09:35 AM
@Uma Maheswara Rao Desulaโ Do you mean creating a config map such that that the offset (event position) of the start and end event positions on the partitions are the same?
Example1: In both starting and ending positions:-
EventPosition.fromOffset("1")
Example1: In both starting and ending positions:-
EventPosition.fromSequenceNumber(100L)
Theoretically having this kind of a configuration map where start and ending positions have same offset or sequence number, we can control how many records are ingested by each partition of the event hub.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-12-2022 03:03 AM
Hi @Sandesh Puligundlaโ ,
The approach you have written is the only approach to read the specific partitions. The reason for this incorrect behavior is a typo error, I think you should use the positions variable instead of start.
val positions = Map(
new NameAndPartition(name, 0) -> EventPosition.fromEndOfStream,
new NameAndPartition(name, 4) -> EventPosition.fromEndOfStream
)
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPositions(positions)
.setMaxEventsPerTrigger(max_Events)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ01-17-2023 09:47 AM
I tried using below snippet to receive messages only from partition id=0
ehName = "<<EVENT-HUB-NAME>>"
# Create event position for partition 0
positionKey1 = {
"ehName": ehName,
"partitionId": 0
}
eventPosition1 = {
"offset": "@latest",
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
# Put the rules into a map. The position key dictionaries must be made into JSON strings to act as the key.
positionMap = {
json.dumps(positionKey1) : eventPosition1
}
# Place the map into the main Event Hub config dictionary
ehConf["eventhubs.startingPositions"] = json.dumps(positionMap)
ehConf["eventhubs.connectionString"] = <eventhub_connection_string>
ehConf["eventhubs.consumerGroup"] = <eventhub_consumer_group>
df = spark \
.readStream \
.format("eventhubs") \
.schema(jsonSchema) \
.options(**conf) \
.load()
But still i receive messages from other partitions as well.
Can anyone guide where am i missing ?

