cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

PySpark JSON read with strict schema check and mark the valid and invalid records based on the non-n

sujitmk77
New Contributor II

Hi,

I have a use case where I have to read the JSON files from "/data/json_files/" location with schema enforced.
For the completeness we want to mark the invalid records. The invalid records may be the ones where the mandatory field/s are null, data type mismatch or invalid json itself.

I have tried below but nothing worked as of now. It would be nice if someone has already this use case and a solution for it or may be knowledgeable in this area.

 
Example Schema:
schema = StructType(
    [
        StructField(
            "meta",
            StructType(
                [
                    StructField("id", StringType(), False),
                    StructField("timestamp", TimestampType(), False),
                    StructField("version", IntegerType(), False),
                ]
            ),
            False,
        ),
        StructField(
            "data",
            ArrayType(
                StructType(
                    [
                        StructField("data_col_1", IntegerType(), False),
                        StructField("data_col_2", StringType(), False),
                        StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True)
                    ]
                )
            ),
            False
        )
    ]
)
 
 
JSON file:
 
json_1.json
 
"data_col_4" is having wrong data type.
"data_col_2" is mandatory as per schema but got null.
 
{
    "meta": {
        "id": "abcd1234",
        "timestamp": "2025-02-07T07:59:12.123Z",
        "version": 1,
    },
    "tasks": [
        {
            "data_col_1": 12,
            "data_col_2": "Required",
            "data_col_3": 9,
"data_col_4": 7
        },
{
            "data_col_1": 13,
            "data_col_2": "Required",
            "data_col_3": 10,
"data_col_4": "Wrong data type"
        },
{
            "data_col_1": 14,
            "data_col_2": null,
            "data_col_3": 11,
"data_col_4": 8
        }
    ]
}
 
json_2.json
 
the "data_col_1" is missing in the tasks.
 
{
    "meta": {
        "id": "efgh5678",
        "timestamp": "2025-02-07T07:59:12.123Z",
        "version": 1,
    },
    "tasks": [
        {
            "data_col_2": "Required",
            "data_col_3": 9,
"data_col_4": 7,
        },
{
            "data_col_1": 22,
            "data_col_2": "Required",
            "data_col_3": 10,
"data_col_4": 11
        }
    ]
}
 
 
 
PySpark Code:
 
raw_df = (
            spark.read.schema(estate_schema)
            .option("mode", "PERMISSIVE")
            .option("multiline", "true")
            .json("/data/json_files/")
            .withColumn("src_filename", input_file_name())
        )
 
OR
 
invalid_df = (
               spark.read.schema(estate_schema)
               .option("mode", "PERMISSIVE")
   .option("columnNameOfCorruptRecord", "_corrupt_record")
   .option("multiline", "true")
   .json("/data/json_files/")
   .withColumn("src_filename", input_file_name())
)
 
 
Expected Outcome:
All the valid records of meta and within the tasks array should be processed and invalid (missing mandatory field or incorrect data type or invalid json) should be marked as invalid for that particular records.
2 REPLIES 2

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @sujitmk77,

You have to ensure that valid records are processed while invalid records are marked appropriately, you can use the following PySpark code. This code reads the JSON files with schema enforcement and handles invalid records by marking them as corrupt

from pyspark.sql.functions import input_file_name

# Define the schema
estate_schema = StructType(
[
StructField(
"meta",
StructType(
[
StructField("id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("version", IntegerType(), False),
]
),
False,
),
StructField(
"data",
ArrayType(
StructType(
[
StructField("data_col_1", IntegerType(), False),
StructField("data_col_2", StringType(), False),
StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True)
]
)
),
False
)
]
)

# Read the JSON files with schema enforcement and handle invalid records
invalid_df = (
spark.read.schema(estate_schema)
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.option("multiline", "true")
.json("/data/json_files/")
.withColumn("src_filename", input_file_name())
)

# Show the DataFrame with invalid records marked
invalid_df.show(truncate=False)

Hi @Alberto_Umana,

There was a type in the schema name, it should be "estate_schema".

However the issue still remains the same, I do not recognise any change in my code and the code you have provided. Let me know if it is other wise.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group