cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Avro Deserialization from Event Hub capture and Autoloader

Gilg
Contributor II

Hi All,

I am getting data from Event Hub capture in Avro format and using Auto Loader to process it.

I get into the point where I can read the Avro by casting the Body into a string.

imageNow I wanted to deserialized the Body column so it will in table format. Managed to do this by constructing a json_schema using StructType() and used the json_schema within the from_json() function which then I do a writeStream into a delta table.

image 

Question. Is there a way that I can deserialize the Avro data without constructing a schema? Event Hub schema registry is one option that I am looking at but not sure how to do that within Autoloader using PySpark.

Cheers,

Gil

4 REPLIES 4

UmaMahesh1
Honored Contributor III

Does your cluster has schema registry service ? If yes, you need not provide schema explicitly.

Uma Mahesh D

UmaMahesh1
Honored Contributor III

If you still want to go with the above approach and don't want to provide schema manually, then you can fetch a tiny batch with 1 record and build the schema into a variable using a .schema option. Once done, you can add a new Body column by providing the schema in the previous variable using the same way from_json. Now since this will be in json format, you can anyways use explode function and do stuff.

Uma Mahesh D

Hi @Uma Maheswara Rao Desulaā€‹ .

If I do the .schema even for a tiny batch. It will only get the schema of the original Avro. ie System columns generated plus the Body column where it holds the actual data .

image

UmaMahesh1
Honored Contributor III

You can build your schema like this...

schema = spark.read.json(capture_tmp.rdd.map(lambda row: row.columnNamewithValue)).schema

Then, df.withColumn('new_col', from_json(col('yourcolumnname'), schema))

Uma Mahesh D

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonā€™t want to miss the chance to attend and share knowledge.

If there isnā€™t a group near you, start one and help create a community that brings people together.

Request a New Group