This blog demonstrates AutoLoader's advanced data quality detection capabilities for JSON files:
In our previous blog post, "Introduction: Auto Loader as the Go-To Ingestion Solution", we explored Auto Loader's core capabilities for reliable data ingestion, focusing on CSV format.
This post explores Auto Loader's advanced data quality features, explicitly focusing on JSON file corruption detection and validation.
Auto Loader's sophisticated data quality detection transforms potential pipeline failures into manageable data audit opportunities, making it an essential tool for production data environments.
JSON files from various upstream systems often contain structural and data quality issues that can break traditional ingestion pipelines. Consider a standard product record with an expected schema. Below are some common corruption patterns among many others:
{"id": 101, "product": {"name": "Wireless Mouse", "category": "Electronics"}, "quantity": 10, "tags": ["wireless", "usb"]}
({id: 102, product: {name: Gaming Keyboard, category: Electronics}})
("extra_property": "bonus_data" or "unexpected_field": "additional_info")
("quantity": "N/A" instead of a number or "category": 123 instead of a string)
("tags": [unclosed, array})
("name": "Phone Case\\invalid")
Auto Loader's built-in quality detection capabilities handle these challenges gracefully across all supported formats, with JSON requiring special attention due to its flexible, nested structure.
Auto Loader provides comprehensive JSON validation through:
Auto Loader is ideal for JSON-heavy data lakes where schema flexibility and data quality monitoring are critical.
Auto Loader offers two complementary approaches for JSON quality detection:
rescuedData automatically captures non-conforming nested data using schemaEvolutionMode: "rescue". Perfect for preserving partial information from complex JSON structures.
_corrupt_record: Stores completely unparseable JSON records when explicitly enabled via schema hints. Essential for debugging malformed JSON syntax.
For production environments prioritizing data continuity, rescuedData provides the optimal balance with minimal overhead while preserving parseable record portions.
When comprehensive audit trails and detailed debugging capabilities are required, _corrupt_record becomes essential despite requiring additional schema hints, as it captures complete record failures for thorough analysis.
The following examples demonstrate JSON-specific corruption scenarios. Auto Loader provides similar capabilities for CSV, XML, Parquet, and other formats.
The following examples demonstrate AutoLoader's ability to handle various JSON corruption scenarios in a controlled environment.
import time
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
# Create unique path for experiment
path = f"/tmp/Auto Loader_json_quality/{int(time.time())}/"
input_path = path + "input/"
dbutils.fs.mkdirs(input_path)
# Expected schema for JSON validation
schema = StructType([
StructField("id", IntegerType(), True),
StructField("product", StructType([
StructField("name", StringType(), True),
StructField("category", StringType(), True)
]), True),
StructField("quantity", IntegerType(), True),
StructField("tags", ArrayType(StringType()), True)
])
# Five JSON corruption scenarios:
# 1. Invalid JSON Syntax - missing quotes and brackets (Detectable via Corrupt Record)
invalid_syntax = '''{"id": 101, "product": {"name": "Wireless Mouse", "category": "Electronics"}, "quantity": 10, "tags": ["wireless", "usb"]}
{id: 102, product: {name: Gaming Keyboard, category: Electronics}, quantity: 15, tags: [gaming, rgb]}'''
# 2. Extra Nested Fields - additional properties beyond expected schema (Detectable via Rescue)
extra_fields = '''{"id": 201, "product": {"name": "Monitor Stand", "category": "Electronics", "extra_property": "bonus_data"}, "quantity": 5, "tags": ["desk", "ergonomic"]}
{"id": 202, "product": {"name": "Desk Pad", "category": "Electronics"}, "quantity": 8, "tags": ["portable"], "unexpected_field": "additional_info"}'''
# 3. Type Mismatch in Nested Objects - wrong data types (Detectable via Rescue)
type_mismatch = '''{"id": 301, "product": {"name": "USB Hub", "category": "Electronics"}, "quantity": "N/A", "tags": ["usb", "hub"]}
{"id": 302, "product": {"name": "Webcam", "category": 123}, "quantity": 3, "tags": "single-tag"}'''
# 4. Malformed Nested Arrays - invalid array syntax (Detectable via Corrupt Record)
malformed_arrays = '''{"id": 401, "product": {"name": "Power Bank", "category": "Electronics"}, "quantity": 12, "tags": ["portable", "charging"]}
{"id": 402, "product": {"name": "Cable", "category": "Electronics"}, "quantity": 20, "tags": [unclosed, array}'''
# 5. Invalid Escape Sequences - corrupted string formatting (Detectable via Corrupt Record)
invalid_escape = '''{"id": 501, "product": {"name": "Laptop Stand", "category": "Electronics"}, "quantity": 7, "tags": ["laptop", "stand"]}
{"id": 502, "product": {"name": "Phone Case\\invalid", "category": "Electronics"}, "quantity": 25, "tags": ["phone", "protection"]}'''
# Write test files
files = {
"invalid_syntax.json": invalid_syntax,
"extra_fields.json": extra_fields,
"type_mismatch.json": type_mismatch,
"malformed_arrays.json": malformed_arrays,
"invalid_escape.json": invalid_escape
}
for filename, content in files.items():
dbutils.fs.put(f"{input_path}{filename}", content, overwrite=True)
print(f"Created {len(files)} JSON test files covering key corruption scenarios")
Rescue mode with predefined schema excels at detecting JSON structure and type mismatches while preserving recoverable nested data.
# Configure Auto Loader with known schema and rescue mode for JSON
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaEvolutionMode", "rescue") # Automatic nested data rescue
.schema(schema) # Predefined JSON schema for validation
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_1/")
.trigger(availableNow=True)
.start(path + "output_1/")
.awaitTermination()
)
# Analyze rescued data - detects Missing Fields and Type Mismatches in nested JSON
df_rescued = spark.read.load(path + "output_1/")
rescued_records = df_rescued.filter(col("_rescued_data").isNotNull())
print(f"=== SCENARIO 1: JSON RESCUE MODE WITH KNOWN SCHEMA ===")
print(f"Issues detected: Extra nested fields, type mismatches in objects/arrays")
print(f"Rescued records: {rescued_records.count()}")
display(rescued_records.select("id", "product.name", "_rescued_data", "file_name"))
Corrupt record detection captures completely malformed JSON that cannot be parsed due to syntax errors.
# Configure Auto Loader with corrupt record detection for JSON syntax issues
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", path + "schema_2/")
.option("cloudFiles.schemaHints", "_corrupt_record STRING") # Enable JSON corruption capture
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_2/")
.trigger(availableNow=True)
.start(path + "output_2/")
.awaitTermination()
)
# Analyze corrupt records - detects Invalid Syntax, Malformed Arrays, Invalid Escapes
df_corrupt = spark.read.load(path + "output_2/")
corrupt_records = df_corrupt.filter(col("_corrupt_record").isNotNull())
print(f"=== SCENARIO 2: JSON CORRUPT RECORD DETECTION ===")
print(f"Issues detected: Invalid JSON syntax, malformed arrays, escape sequence errors")
print(f"Corrupt records: {corrupt_records.count()}")
display(corrupt_records.select("_corrupt_record", "file_name"))
Both rescue mode and corrupt record detection provide complete coverage for all JSON corruption scenarios.
# Enhanced schema including corrupt record field for comprehensive JSON validation
schema_with_corrupt = StructType([
StructField("id", IntegerType(), True),
StructField("product", StructType([
StructField("name", StringType(), True),
StructField("category", StringType(), True)
]), True),
StructField("quantity", IntegerType(), True),
StructField("tags", ArrayType(StringType()), True),
StructField("_corrupt_record", StringType(), True)
])
# Configure Auto Loader with comprehensive JSON quality detection
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(schema_with_corrupt) # Complete schema with corruption handling
.option("cloudFiles.schemaEvolutionMode", "rescue") # Nested data rescue
.option("cloudFiles.schemaHints", "_corrupt_record STRING") # Syntax error capture
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_3/")
.trigger(availableNow=True)
.start(path + "output_3/")
.awaitTermination()
)
# Comprehensive JSON quality analysis - detects ALL five corruption scenarios
df_combined = spark.read.load(path + "output_3/")
all_issues = df_combined.filter(col("_rescued_data").isNotNull() | col("_corrupt_record").isNotNull())
print(f"=== SCENARIO 3: COMPREHENSIVE JSON QUALITY DETECTION ===")
print(f"Total JSON records processed: {df_combined.count()}")
print(f"Records with quality issues: {all_issues.count()}")
print("Complete JSON validation: All 5 corruption types detected\n")
# Show detailed breakdown by issue type and source file
print("--- Complete JSON Quality Analysis ---")
display(all_issues.select("id", "product.name", "_rescued_data", "_corrupt_record", "file_name")
.orderBy("file_name"))
# JSON-specific detection summary
rescued_count = df_combined.filter(col("_rescued_data").isNotNull()).count()
corrupt_count = df_combined.filter(col("_corrupt_record").isNotNull()).count()
print(f"\n✓ Extra Fields & Type Mismatches → Rescued: {rescued_count}")
print(f"✓ Syntax Errors, Array/Escape Issues → Corrupt: {corrupt_count}")
print(f"✓ JSON quality coverage: {((rescued_count + corrupt_count) / df_combined.count() * 100):.1f}%")
To implement robust JSON quality detection:
Auto Loader's advanced JSON quality detection capabilities make it indispensable for modern data lakes handling complex nested data.
The combination of rescue mode and corrupt record detection provides comprehensive coverage for JSON-specific corruption scenarios, from syntax errors to nested structure mismatches.
Ready to implement these data quality patterns in your own pipelines? Start by testing the provided code examples in your Databricks environment and adapt them to your specific JSON schema requirements.
For deeper implementation guidance, explore the AutoLoader Schema Evolution Documentation and JSON-specific AutoLoader Options. Additionally, stay tuned for our upcoming XML Data Ingestion with Built-in Data Audit and Validation blog post.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.