10-01-2022 03:15 AM
Following are the details of the requirement:
1. I am using databricks notebook to read data from Kafka topic and writing into ADLS Gen2 container i.e., my landing layer.
2. I am using Spark code to read data from Kafka and write into landing layer.
3. Next step is, I am reading Json files from landing layer and moving to bronze layer, which is another container in my ADLS Gen2. For this purpose, I am using Autoloader with Delta Live table to create table using Autoloader.
Here is the code for the same:
@dlt.table(
name = tablename,
comment = "Create Bronze Table",
table_properties={
"quality": "bronze"
}
)
def Bronze_Table_Create():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", schemalocation)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(sourcelocation)
)
4. This code works fine for me, it infers the schema as well. However, I have one scenario, which I am trying to handle. Which I mentioned step by step below:
i. I want to validate the schema, so that if there is any change in the schema, I should get notified and the job should get failed. I can handle it through SchemaEvolutionMode. However, my scenario is quite different. In my scenario, I am having one column RawData, which is of type object and there is not specified schema for it. It will get dynamic values, because of that if I infer the schema and apply schema validation, then every time it will bring new schema and throw schema mismatch error.
ii. Is there is any solution by which I can exclude the RawData column from schema validation, so that I allows this column to have any type of data.
I am struggling for this from a long time. Any help on this is helpful. Please let me know if any additional details are required on this.
Sample Json:
{
"Header": {
"SchemaVersion": "1.0",
"EventId": "123",
"EventTime_UTC": "2022-09-22 16:18:16",
"Environment": "dev",
},
"Payload": {
"RawData": {
"CusID": "12345",
"Status": "Pending",
"LastModifiedAt": "2022-09-22 16:18:12",
"ContainerName": "default,
"CreatedAt": "2022-09-22 16:18:11" *The data in the RawData is inconsistent like it can have different columns*
},
"Data": {
"CustID": "12345",
"ArrayKeys": [
{
"ArrayName": "WorkHistory",
"ArrayKeyName": "SampleId"
}
]
}
}
}
10-02-2022 11:37 AM
Maybe don't validate schema than in next step do dlt expectation to check are there all required fields in Data struct type.
10-02-2022 09:29 PM
Hi @Hubert Dudek, Actually I want to validate the schema so that I would be able to know if there are any additional fields apart from the schema added to the data. If I do the expectation check in the next level, I need to apply a check for individual columns and there are many columns, so it will be difficult to handle. Is there any way I can exclude a particular column like RawData in my case from schema enforcement? So that I won't apply for the RawData column for which unspecified Or dynamic data is coming.
10-28-2022 11:16 PM
Hi @Swapnil Kamle
Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!
12-01-2022 04:56 PM
Sorry for delay in reply. I didn't get the exact answer
10-24-2024 10:01 PM
just to clarify, are you reading kafka and writing into adls in json files? like for each message from kafka is 1 json file in adls ?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group