cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to receive data from azure event hub in parquet ?

Swaroop
New Contributor

import asyncio

import os

from azure.eventhub.aio import EventHubConsumerClient

CONNECTION_STR = "Connection_string"

EVENTHUB_NAME = "event_hub"

async def on_event(partition_context, event):

    # Put your code here.

    # If the operation is i/o intensive, async will have better performance.

    print("Received event from partition: {}.".format(partition_context.partition_id))

    print(event)

async def on_partition_initialize(partition_context):

    # Put your code here.

    print("Partition: {} has been initialized.".format(partition_context.partition_id))

async def on_partition_close(partition_context, reason):

    # Put your code here.

    print("Partition: {} has been closed, reason for closing: {}.".format(

        partition_context.partition_id,

        reason

    ))

async def on_error(partition_context, error):

    # Put your code here. partition_context can be None in the on_error callback.

    if partition_context:

        print("An exception: {} occurred during receiving from Partition: {}.".format(

            partition_context.partition_id,

            error

        ))

    else:

        print("An exception: {} occurred during the load balance process.".format(error))

async def main():

    client = EventHubConsumerClient.from_connection_string(

        conn_str=CONNECTION_STR,

        consumer_group="$Default",

        eventhub_name=EVENTHUB_NAME

    )

    async with client:

        await client.receive(

            on_event=on_event,

            on_error=on_error,

            on_partition_close=on_partition_close,

            on_partition_initialize=on_partition_initialize,

            starting_position="-1",  # "-1" is from the beginning of the partition. @latest

        )

await main()

-----------------------------------------------

I am able to receive the data in json , but the vendor is sending data in parquet . What modifications can be done to receive the data in parquet ?

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.