PySpark JSON read with strict schema check and mark the valid and invalid records based on the non-n
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-07-2025 12:29 AM
Hi,
I have a use case where I have to read the JSON files from "/data/json_files/" location with schema enforced.
For the completeness we want to mark the invalid records. The invalid records may be the ones where the mandatory field/s are null, data type mismatch or invalid json itself.
I have tried below but nothing worked as of now. It would be nice if someone has already this use case and a solution for it or may be knowledgeable in this area.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-07-2025 05:11 AM
Hi @sujitmk77,
You have to ensure that valid records are processed while invalid records are marked appropriately, you can use the following PySpark code. This code reads the JSON files with schema enforcement and handles invalid records by marking them as corrupt
from pyspark.sql.functions import input_file_name
# Define the schema
estate_schema = StructType(
[
StructField(
"meta",
StructType(
[
StructField("id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("version", IntegerType(), False),
]
),
False,
),
StructField(
"data",
ArrayType(
StructType(
[
StructField("data_col_1", IntegerType(), False),
StructField("data_col_2", StringType(), False),
StructField("data_col_3", IntegerType(), True),
StructField("data_col_4", IntegerType(), True)
]
)
),
False
)
]
)
# Read the JSON files with schema enforcement and handle invalid records
invalid_df = (
spark.read.schema(estate_schema)
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.option("multiline", "true")
.json("/data/json_files/")
.withColumn("src_filename", input_file_name())
)
# Show the DataFrame with invalid records marked
invalid_df.show(truncate=False)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-08-2025 11:47 AM
Hi @Alberto_Umana,
There was a type in the schema name, it should be "estate_schema".
However the issue still remains the same, I do not recognise any change in my code and the code you have provided. Let me know if it is other wise.

