Following are the details of the requirement:
1. I am using databricks notebook to read data from Kafka topic and writing into ADLS Gen2 container i.e., my landing layer.
2. I am using Spark code to read data from Kafka and write into landing layer.
3. Next step is, I am reading Json files from landing layer and moving to bronze layer, which is another container in my ADLS Gen2. For this purpose, I am using Autoloader with Delta Live table to create table using Autoloader.
Here is the code for the same:
@dlt.table(
name = tablename,
comment = "Create Bronze Table",
table_properties={
"quality": "bronze"
}
)
def Bronze_Table_Create():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", schemalocation)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(sourcelocation)
)
4. This code works fine for me, it infers the schema as well. However, I have one scenario, which I am trying to handle. Which I mentioned step by step below:
i. I want to validate the schema, so that if there is any change in the schema, I should get notified and the job should get failed. I can handle it through SchemaEvolutionMode. However, my scenario is quite different. In my scenario, I am having one column RawData, which is of type object and there is not specified schema for it. It will get dynamic values, because of that if I infer the schema and apply schema validation, then every time it will bring new schema and throw schema mismatch error.
ii. Is there is any solution by which I can exclude the RawData column from schema validation, so that I allows this column to have any type of data.
I am struggling for this from a long time. Any help on this is helpful. Please let me know if any additional details are required on this.
Sample Json:
{
"Header": {
"SchemaVersion": "1.0",
"EventId": "123",
"EventTime_UTC": "2022-09-22 16:18:16",
"Environment": "dev",
},
"Payload": {
"RawData": {
"CusID": "12345",
"Status": "Pending",
"LastModifiedAt": "2022-09-22 16:18:12",
"ContainerName": "default,
"CreatedAt": "2022-09-22 16:18:11" *The data in the RawData is inconsistent like it can have different columns*
},
"Data": {
"CustID": "12345",
"ArrayKeys": [
{
"ArrayName": "WorkHistory",
"ArrayKeyName": "SampleId"
}
]
}
}
}