<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: spark-streaming read from specific event hub partition in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17775#M11728</link>
    <description>&lt;P&gt;Hi @Sandesh Puligundla​&amp;nbsp;,&lt;/P&gt;&lt;P&gt;The approach you have written is the only approach to read the specific partitions. The reason for this incorrect behavior is a&lt;B&gt; typo error&lt;/B&gt;, I think you should use the &lt;B&gt;positions &lt;/B&gt;variable instead of &lt;B&gt;start.&lt;/B&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;val positions = Map(
  new NameAndPartition(name, 0) -&amp;gt; EventPosition.fromEndOfStream,
  new NameAndPartition(name, 4) -&amp;gt; EventPosition.fromEndOfStream
)
 
val eventHubsConf = EventHubsConf(connectionString)
                    .setStartingPositions(positions)
                    .setMaxEventsPerTrigger(max_Events)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 12 Dec 2022 11:03:56 GMT</pubDate>
    <dc:creator>vivek_rawat</dc:creator>
    <dc:date>2022-12-12T11:03:56Z</dc:date>
    <item>
      <title>spark-streaming read from specific event hub partition</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17773#M11726</link>
      <description>&lt;P&gt;The azure event hub "my_event_hub" has a total of 5 partitions ("0", "1", "2", "3", "4")&lt;/P&gt;&lt;P&gt;The readstream should only read events from partitions "0" and "4"&lt;/P&gt;&lt;P&gt;event hub configuration as streaming source:-&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;val name = "my_event_hub"
val connectionString = "my_event_hub_connection_string"
val max_events = 50
&amp;nbsp;
&amp;nbsp;
val positions = Map(
  new NameAndPartition(name, 0) -&amp;gt; EventPosition.fromEndOfStream,
  new NameAndPartition(name, 4) -&amp;gt; EventPosition.fromEndOfStream
)
&amp;nbsp;
val eventHubsConf = EventHubsConf(connectionString)
                    .setStartingPositions(start)
                    .setMaxEventsPerTrigger(max_Events)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;official doc for structured-streaming-eventhubs-integation:&amp;nbsp;&lt;A href="https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md" alt="https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md" target="_blank"&gt;https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-integration.md&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Using the above configuration the streaming application reads from all 5 partitions of the event hub. Can we read from specific partitions only?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;For example read events only from 2 partitions "0" and "4" with the checkpoint and offsets pointed to the specific partitions.&lt;/P&gt;</description>
      <pubDate>Thu, 08 Dec 2022 20:07:53 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17773#M11726</guid>
      <dc:creator>Sandesh87</dc:creator>
      <dc:date>2022-12-08T20:07:53Z</dc:date>
    </item>
    <item>
      <title>Re: spark-streaming read from specific event hub partition</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17774#M11727</link>
      <description>&lt;P&gt;Hi @Sandesh Puligundla​&amp;nbsp;&lt;/P&gt;&lt;P&gt;For those specific partitions, can you try give start and end positions as a single value or a non existent number and see if it excludes that ? &lt;/P&gt;</description>
      <pubDate>Fri, 09 Dec 2022 06:03:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17774#M11727</guid>
      <dc:creator>UmaMahesh1</dc:creator>
      <dc:date>2022-12-09T06:03:59Z</dc:date>
    </item>
    <item>
      <title>Re: spark-streaming read from specific event hub partition</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17775#M11728</link>
      <description>&lt;P&gt;Hi @Sandesh Puligundla​&amp;nbsp;,&lt;/P&gt;&lt;P&gt;The approach you have written is the only approach to read the specific partitions. The reason for this incorrect behavior is a&lt;B&gt; typo error&lt;/B&gt;, I think you should use the &lt;B&gt;positions &lt;/B&gt;variable instead of &lt;B&gt;start.&lt;/B&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;val positions = Map(
  new NameAndPartition(name, 0) -&amp;gt; EventPosition.fromEndOfStream,
  new NameAndPartition(name, 4) -&amp;gt; EventPosition.fromEndOfStream
)
 
val eventHubsConf = EventHubsConf(connectionString)
                    .setStartingPositions(positions)
                    .setMaxEventsPerTrigger(max_Events)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 12 Dec 2022 11:03:56 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17775#M11728</guid>
      <dc:creator>vivek_rawat</dc:creator>
      <dc:date>2022-12-12T11:03:56Z</dc:date>
    </item>
    <item>
      <title>Re: spark-streaming read from specific event hub partition</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17776#M11729</link>
      <description>&lt;P&gt;&lt;/P&gt;&lt;P&gt;@Uma Maheswara Rao Desula​&amp;nbsp;Do you mean creating a config map such that that the offset (event position) of the start and end event positions on the partitions are the same?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Example1: In both starting and ending positions:-&lt;/P&gt;&lt;P&gt;EventPosition.fromOffset("1")      &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Example1: In both starting and ending positions:-&lt;/P&gt;&lt;P&gt;EventPosition.fromSequenceNumber(100L)    &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Theoretically having this kind of a configuration map where start and ending positions have same offset or sequence number, we can control how many records are ingested by each partition of the event hub.&lt;/P&gt;</description>
      <pubDate>Mon, 19 Dec 2022 17:35:27 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17776#M11729</guid>
      <dc:creator>Sandesh87</dc:creator>
      <dc:date>2022-12-19T17:35:27Z</dc:date>
    </item>
    <item>
      <title>Re: spark-streaming read from specific event hub partition</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17777#M11730</link>
      <description>&lt;P&gt;I tried using below snippet to receive messages &lt;B&gt;only&lt;/B&gt; from partition id=0&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;ehName = "&amp;lt;&amp;lt;EVENT-HUB-NAME&amp;gt;&amp;gt;"
&amp;nbsp;
# Create event position for partition 0
positionKey1 = {
  "ehName": ehName,
  "partitionId": 0
}
&amp;nbsp;
eventPosition1 = {
  "offset": "@latest",    
  "seqNo": -1,            
  "enqueuedTime": None,   
  "isInclusive": True
}
&amp;nbsp;
# Put the rules into a map. The position key dictionaries must be made into JSON strings to act as the key.
positionMap = {
  json.dumps(positionKey1) : eventPosition1
}
&amp;nbsp;
# Place the map into the main Event Hub config dictionary
ehConf["eventhubs.startingPositions"] = json.dumps(positionMap)
ehConf["eventhubs.connectionString"] = &amp;lt;eventhub_connection_string&amp;gt;
ehConf["eventhubs.consumerGroup"] = &amp;lt;eventhub_consumer_group&amp;gt;
&amp;nbsp;
df = spark \
    .readStream \
    .format("eventhubs") \
    .schema(jsonSchema) \
    .options(**conf) \
    .load()&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt; But still i receive messages from other partitions as well.&lt;/P&gt;&lt;P&gt;Can anyone guide where am i missing ?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 17 Jan 2023 17:47:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streaming-read-from-specific-event-hub-partition/m-p/17777#M11730</guid>
      <dc:creator>keshav</dc:creator>
      <dc:date>2023-01-17T17:47:16Z</dc:date>
    </item>
  </channel>
</rss>

