<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic How to receive data from azure event hub in parquet ? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-receive-data-from-azure-event-hub-in-parquet/m-p/4156#M956</link>
    <description>&lt;P&gt;import asyncio&lt;/P&gt;&lt;P&gt;import os&lt;/P&gt;&lt;P&gt;from azure.eventhub.aio import EventHubConsumerClient&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;CONNECTION_STR = "Connection_string"&lt;/P&gt;&lt;P&gt;EVENTHUB_NAME = "event_hub"&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def on_event(partition_context, event):&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # Put your code here.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # If the operation is i/o intensive, async will have better performance.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; print("Received event from partition: {}.".format(partition_context.partition_id))&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; print(event)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def on_partition_initialize(partition_context):&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # Put your code here.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; print("Partition: {} has been initialized.".format(partition_context.partition_id))&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def on_partition_close(partition_context, reason):&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # Put your code here.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; print("Partition: {} has been closed, reason for closing: {}.".format(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; partition_context.partition_id,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; reason&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; ))&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def on_error(partition_context, error):&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # Put your code here. partition_context can be None in the on_error callback.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; if partition_context:&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("An exception: {} occurred during receiving from Partition: {}.".format(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; partition_context.partition_id,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; error&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ))&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; else:&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("An exception: {} occurred during the load balance process.".format(error))&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def main():&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; client = EventHubConsumerClient.from_connection_string(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; conn_str=CONNECTION_STR,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer_group="$Default",&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; eventhub_name=EVENTHUB_NAME&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; )&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; async with client:&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; await client.receive(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on_event=on_event,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on_error=on_error,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on_partition_close=on_partition_close,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on_partition_initialize=on_partition_initialize,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; starting_position="-1", &amp;nbsp;# "-1" is from the beginning of the partition. @latest&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;await main()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;-----------------------------------------------&lt;/P&gt;&lt;P&gt;I am able to receive the data in json , but the vendor is sending data in parquet . What modifications can be done to receive the data in parquet ?&lt;/P&gt;</description>
    <pubDate>Fri, 19 May 2023 12:00:48 GMT</pubDate>
    <dc:creator>Swaroop</dc:creator>
    <dc:date>2023-05-19T12:00:48Z</dc:date>
    <item>
      <title>How to receive data from azure event hub in parquet ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-receive-data-from-azure-event-hub-in-parquet/m-p/4156#M956</link>
      <description>&lt;P&gt;import asyncio&lt;/P&gt;&lt;P&gt;import os&lt;/P&gt;&lt;P&gt;from azure.eventhub.aio import EventHubConsumerClient&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;CONNECTION_STR = "Connection_string"&lt;/P&gt;&lt;P&gt;EVENTHUB_NAME = "event_hub"&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def on_event(partition_context, event):&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # Put your code here.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # If the operation is i/o intensive, async will have better performance.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; print("Received event from partition: {}.".format(partition_context.partition_id))&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; print(event)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def on_partition_initialize(partition_context):&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # Put your code here.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; print("Partition: {} has been initialized.".format(partition_context.partition_id))&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def on_partition_close(partition_context, reason):&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # Put your code here.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; print("Partition: {} has been closed, reason for closing: {}.".format(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; partition_context.partition_id,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; reason&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; ))&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def on_error(partition_context, error):&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; # Put your code here. partition_context can be None in the on_error callback.&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; if partition_context:&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("An exception: {} occurred during receiving from Partition: {}.".format(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; partition_context.partition_id,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; error&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ))&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; else:&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("An exception: {} occurred during the load balance process.".format(error))&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;async def main():&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; client = EventHubConsumerClient.from_connection_string(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; conn_str=CONNECTION_STR,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; consumer_group="$Default",&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; eventhub_name=EVENTHUB_NAME&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; )&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; async with client:&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; await client.receive(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on_event=on_event,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on_error=on_error,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on_partition_close=on_partition_close,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on_partition_initialize=on_partition_initialize,&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; starting_position="-1", &amp;nbsp;# "-1" is from the beginning of the partition. @latest&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;await main()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;-----------------------------------------------&lt;/P&gt;&lt;P&gt;I am able to receive the data in json , but the vendor is sending data in parquet . What modifications can be done to receive the data in parquet ?&lt;/P&gt;</description>
      <pubDate>Fri, 19 May 2023 12:00:48 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-receive-data-from-azure-event-hub-in-parquet/m-p/4156#M956</guid>
      <dc:creator>Swaroop</dc:creator>
      <dc:date>2023-05-19T12:00:48Z</dc:date>
    </item>
  </channel>
</rss>

