Hi,
I have a use case where I have to read the JSON files from "/data/json_files/" location with schema enforced.
For the completeness we want to mark the invalid records. The invalid records may be the ones where the mandatory field/s are null, data type mismatch or invalid json itself.
I have tried below but nothing worked as of now. It would be nice if someone has already this use case and a solution for it or may be knowledgeable in this area.
Example Schema:
schema = StructType(
[
StructField(
"meta",
StructType(
[
StructField("id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("version", IntegerType(), False),
]
),
False,
),
StructField(
"data",
ArrayType(
StructType(
[
StructField("data_col_1", IntegerType(), False),
StructField("data_col_2", StringType(), False),
StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True)
]
)
),
False
)
]
)
JSON file:
json_1.json
"data_col_4" is having wrong data type.
"data_col_2" is mandatory as per schema but got null.
{
"meta": {
"id": "abcd1234",
"timestamp": "2025-02-07T07:59:12.123Z",
"version": 1,
},
"tasks": [
{
"data_col_1": 12,
"data_col_2": "Required",
"data_col_3": 9,
"data_col_4": 7
},
{
"data_col_1": 13,
"data_col_2": "Required",
"data_col_3": 10,
"data_col_4": "Wrong data type"
},
{
"data_col_1": 14,
"data_col_2": null,
"data_col_3": 11,
"data_col_4": 8
}
]
}
json_2.json
the "data_col_1" is missing in the tasks.
{
"meta": {
"id": "efgh5678",
"timestamp": "2025-02-07T07:59:12.123Z",
"version": 1,
},
"tasks": [
{
"data_col_2": "Required",
"data_col_3": 9,
"data_col_4": 7,
},
{
"data_col_1": 22,
"data_col_2": "Required",
"data_col_3": 10,
"data_col_4": 11
}
]
}
PySpark Code:
raw_df = (
spark.read.schema(estate_schema)
.option("mode", "PERMISSIVE")
.option("multiline", "true")
.json("/data/json_files/")
.withColumn("src_filename", input_file_name())
)
OR
invalid_df = (
spark.read.schema(estate_schema)
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.option("multiline", "true")
.json("/data/json_files/")
.withColumn("src_filename", input_file_name())
)
Expected Outcome:
All the valid records of meta and within the tasks array should be processed and invalid (missing mandatory field or incorrect data type or invalid json) should be marked as invalid for that particular records.