Avro Deserialization from Event Hub capture and Autoloader
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-13-2022 09:07 PM
Hi All,
I am getting data from Event Hub capture in Avro format and using Auto Loader to process it.
I get into the point where I can read the Avro by casting the Body into a string.
Now I wanted to deserialized the Body column so it will in table format. Managed to do this by constructing a json_schema using StructType() and used the json_schema within the from_json() function which then I do a writeStream into a delta table.
Question. Is there a way that I can deserialize the Avro data without constructing a schema? Event Hub schema registry is one option that I am looking at but not sure how to do that within Autoloader using PySpark.
Cheers,
Gil
- Labels:
-
Autoloader
-
Avro
-
Delta table
-
Eventhub
-
JSON
-
Pyspark
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-13-2022 09:37 PM
Does your cluster has schema registry service ? If yes, you need not provide schema explicitly.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-13-2022 09:43 PM
If you still want to go with the above approach and don't want to provide schema manually, then you can fetch a tiny batch with 1 record and build the schema into a variable using a .schema option. Once done, you can add a new Body column by providing the schema in the previous variable using the same way from_json. Now since this will be in json format, you can anyways use explode function and do stuff.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-14-2022 06:03 PM
Hi @Uma Maheswara Rao Desulaโ .
If I do the .schema even for a tiny batch. It will only get the schema of the original Avro. ie System columns generated plus the Body column where it holds the actual data .
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ12-14-2022 10:10 PM
You can build your schema like this...
schema = spark.read.json(capture_tmp.rdd.map(lambda row: row.columnNamewithValue)).schema
Then, df.withColumn('new_col', from_json(col('yourcolumnname'), schema))

