Avro Deserialization from Event Hub capture and Autoloader

Gilg
Contributor II

Hi All,

I am getting data from Event Hub capture in Avro format and using Auto Loader to process it.

I get into the point where I can read the Avro by casting the Body into a string.

imageNow I wanted to deserialized the Body column so it will in table format. Managed to do this by constructing a json_schema using StructType() and used the json_schema within the from_json() function which then I do a writeStream into a delta table.

image 

Question. Is there a way that I can deserialize the Avro data without constructing a schema? Event Hub schema registry is one option that I am looking at but not sure how to do that within Autoloader using PySpark.

Cheers,

Gil

UmaMahesh1
Honored Contributor III

Does your cluster has schema registry service ? If yes, you need not provide schema explicitly.

Uma Mahesh D

UmaMahesh1
Honored Contributor III

If you still want to go with the above approach and don't want to provide schema manually, then you can fetch a tiny batch with 1 record and build the schema into a variable using a .schema option. Once done, you can add a new Body column by providing the schema in the previous variable using the same way from_json. Now since this will be in json format, you can anyways use explode function and do stuff.

Uma Mahesh D

Hi @Uma Maheswara Rao Desula​ .

If I do the .schema even for a tiny batch. It will only get the schema of the original Avro. ie System columns generated plus the Body column where it holds the actual data .

image

UmaMahesh1
Honored Contributor III

You can build your schema like this...

schema = spark.read.json(capture_tmp.rdd.map(lambda row: row.columnNamewithValue)).schema

Then, df.withColumn('new_col', from_json(col('yourcolumnname'), schema))

Uma Mahesh D