How to handle schema validation for Json file. Using Databricks Autoloader?

SRK
Databricks Partner

Following are the details of the requirement:

1.      I am using databricks notebook to read data from Kafka topic and writing into ADLS Gen2 container i.e., my landing layer.

2.      I am using Spark code to read data from Kafka and write into landing layer.

3.      Next step is, I am reading Json files from landing layer and moving to bronze layer, which is another container in my ADLS Gen2. For this purpose, I am using Autoloader with Delta Live table to create table using Autoloader.

Here is the code for the same:

 

@dlt.table(

 name = tablename,

 comment = "Create Bronze Table",

 table_properties={

  "quality": "bronze"

 }

)

def Bronze_Table_Create():

 return (

  spark

  .readStream

  .format("cloudFiles")

  .option("cloudFiles.format", "json")

  .option("cloudFiles.schemaLocation", schemalocation) 

  .option("cloudFiles.inferColumnTypes", "true")

  .option("cloudFiles.schemaEvolutionMode", "rescue")   

  .load(sourcelocation)

 )

4.      This code works fine for me, it infers the schema as well. However, I have one scenario, which I am trying to handle. Which I mentioned step by step below:

i.                    I want to validate the schema, so that if there is any change in the schema, I should get notified and the job should get failed. I can handle it through SchemaEvolutionMode. However, my scenario is quite different. In my scenario, I am having one column RawData, which is of type object and there is not specified schema for it. It will get dynamic values, because of that if I infer the schema and apply schema validation, then every time it will bring new schema and throw schema mismatch error.

ii.                  Is there is any solution by which I can exclude the RawData column from schema validation, so that I allows this column to have any type of data.

I am struggling for this from a long time. Any help on this is helpful. Please let me know if any additional details are required on this.

Sample Json:

{

 "Header": {

   "SchemaVersion": "1.0",

   "EventId": "123",

   "EventTime_UTC": "2022-09-22 16:18:16",

   "Environment": "dev",

 },

 "Payload": {

   "RawData": {

     "CusID": "12345",

     "Status": "Pending",

     "LastModifiedAt": "2022-09-22 16:18:12",

     "ContainerName": "default,

     "CreatedAt": "2022-09-22 16:18:11" *The data in the RawData is inconsistent like it can have different columns*

   },

   "Data": {

     "CustID": "12345",

     "ArrayKeys": [

       {

         "ArrayName": "WorkHistory",

         "ArrayKeyName": "SampleId"

       }

     ]

   }

 }

}