cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
matta
Databricks Employee
Databricks Employee

Summary

This blog demonstrates AutoLoader's advanced data quality detection capabilities for JSON files:

  • Dual Detection Approaches: Leverage rescuedData for partial data recovery and _corrupt_record for complete failure capture
  • Five Corruption Patterns: Handle invalid syntax, extra fields, type mismatches, malformed arrays, and escape sequence errors
  • Practical Implementation: Working PySpark code with real examples
  • Production Strategy: Use rescuedData for data continuity, add _corrupt_record for audit trails and debugging
  • File-Level Tracking: Implement metadata-based source tracking for precise issue resolution

Introduction: Building on Auto Loader Fundamentals

In our previous blog post, "Introduction: Auto Loader as the Go-To Ingestion Solution", we explored Auto Loader's core capabilities for reliable data ingestion, focusing on CSV format.

This post explores Auto Loader's advanced data quality features, explicitly focusing on JSON file corruption detection and validation.

Auto Loader's sophisticated data quality detection transforms potential pipeline failures into manageable data audit opportunities, making it an essential tool for production data environments.

The Challenge: JSON Data Quality in Production

JSON files from various upstream systems often contain structural and data quality issues that can break traditional ingestion pipelines. Consider a standard product record with an expected schema. Below are some common corruption patterns among many others:

{"id": 101, "product": {"name": "Wireless Mouse", "category": "Electronics"}, "quantity": 10, "tags": ["wireless", "usb"]}
  • Invalid JSON Syntax: Missing quotes or brackets prevent parsing 
    ({id: 102, product: {name: Gaming Keyboard, category: Electronics}})
  • Extra Nested Fields: Additional properties beyond expected schema 
    ("extra_property": "bonus_data" or "unexpected_field": "additional_info")
  • Type Mismatch in Objects: Wrong data types in nested structures 
    ("quantity": "N/A" instead of a number or "category": 123 instead of a string)
  • Malformed Arrays: Invalid array syntax with unclosed elements 
    ("tags": [unclosed, array})
  • Invalid Escape Sequences: Corrupted string formatting with bad escapes
     ("name": "Phone Case\\invalid")

Auto Loader's built-in quality detection capabilities handle these challenges gracefully across all supported formats, with JSON requiring special attention due to its flexible, nested structure.

Auto Loader: Your Data Quality Safety Net

Auto Loader provides comprehensive JSON validation through:

  • Automatic Schema Inference: Adapts to complex nested JSON structures
  • Corrupt Record Isolation: Captures malformed JSON without pipeline failure
  • Built-in Rescue Mode: Automatically preserves partial data from corrupted records
  • No Pipeline Restarts: Continues processing despite data quality issues
  • File-Level Tracking: Uses _metadata.file_name to trace the problems to source files
  • Flexible Processing Modes: Switch between batch (availableNow=True) and streaming modes (processingTime=’Time Window’)
  • Nested Structure Support: Handles complex JSON hierarchies and arrays

Auto Loader is ideal for JSON-heavy data lakes where schema flexibility and data quality monitoring are critical.

Detection Approaches: rescuedData vs. _corrupt_record

Auto Loader offers two complementary approaches for JSON quality detection:

rescuedData automatically captures non-conforming nested data using schemaEvolutionMode: "rescue". Perfect for preserving partial information from complex JSON structures.

_corrupt_record: Stores completely unparseable JSON records when explicitly enabled via schema hints. Essential for debugging malformed JSON syntax.

For production environments prioritizing data continuity, rescuedData provides the optimal balance with minimal overhead while preserving parseable record portions. 

When comprehensive audit trails and detailed debugging capabilities are required, _corrupt_record becomes essential despite requiring additional schema hints, as it captures complete record failures for thorough analysis.

Practical Examples with JSON Files

The following examples demonstrate JSON-specific corruption scenarios. Auto Loader provides similar capabilities for CSV, XML, Parquet, and other formats.

Scenario Preparation: JSON Data Quality Test Cases

The following examples demonstrate AutoLoader's ability to handle various JSON corruption scenarios in a controlled environment. 

  • We'll create five test files, each containing different types of data quality issues that mirror real-world production challenges. 
  • Each file contains two records - one clean and one problematic - allowing us to observe how AutoLoader's detection mechanisms identify and isolate quality issues while preserving valid data. 
  • This approach enables us to validate both detection accuracy and data recovery capabilities across different corruption patterns.
import time
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType

# Create unique path for experiment
path = f"/tmp/Auto Loader_json_quality/{int(time.time())}/"
input_path = path + "input/"
dbutils.fs.mkdirs(input_path)

# Expected schema for JSON validation
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("product", StructType([
        StructField("name", StringType(), True),
        StructField("category", StringType(), True)
    ]), True),
    StructField("quantity", IntegerType(), True),
    StructField("tags", ArrayType(StringType()), True)
])

# Five JSON corruption scenarios:

# 1. Invalid JSON Syntax - missing quotes and brackets (Detectable via Corrupt Record)
invalid_syntax = '''{"id": 101, "product": {"name": "Wireless Mouse", "category": "Electronics"}, "quantity": 10, "tags": ["wireless", "usb"]}
{id: 102, product: {name: Gaming Keyboard, category: Electronics}, quantity: 15, tags: [gaming, rgb]}'''

# 2. Extra Nested Fields - additional properties beyond expected schema (Detectable via Rescue)
extra_fields = '''{"id": 201, "product": {"name": "Monitor Stand", "category": "Electronics", "extra_property": "bonus_data"}, "quantity": 5, "tags": ["desk", "ergonomic"]}
{"id": 202, "product": {"name": "Desk Pad", "category": "Electronics"}, "quantity": 8, "tags": ["portable"], "unexpected_field": "additional_info"}'''

# 3. Type Mismatch in Nested Objects - wrong data types (Detectable via Rescue)
type_mismatch = '''{"id": 301, "product": {"name": "USB Hub", "category": "Electronics"}, "quantity": "N/A", "tags": ["usb", "hub"]}
{"id": 302, "product": {"name": "Webcam", "category": 123}, "quantity": 3, "tags": "single-tag"}'''

# 4. Malformed Nested Arrays - invalid array syntax (Detectable via Corrupt Record)
malformed_arrays = '''{"id": 401, "product": {"name": "Power Bank", "category": "Electronics"}, "quantity": 12, "tags": ["portable", "charging"]}
{"id": 402, "product": {"name": "Cable", "category": "Electronics"}, "quantity": 20, "tags": [unclosed, array}'''

# 5. Invalid Escape Sequences - corrupted string formatting (Detectable via Corrupt Record)
invalid_escape = '''{"id": 501, "product": {"name": "Laptop Stand", "category": "Electronics"}, "quantity": 7, "tags": ["laptop", "stand"]}
{"id": 502, "product": {"name": "Phone Case\\invalid", "category": "Electronics"}, "quantity": 25, "tags": ["phone", "protection"]}'''

# Write test files
files = {
    "invalid_syntax.json": invalid_syntax,
    "extra_fields.json": extra_fields,
    "type_mismatch.json": type_mismatch,
    "malformed_arrays.json": malformed_arrays,
    "invalid_escape.json": invalid_escape
}

for filename, content in files.items():
    dbutils.fs.put(f"{input_path}{filename}", content, overwrite=True)
    
print(f"Created {len(files)} JSON test files covering key corruption scenarios")

matta_0-1757057823317.png

Scenario 1: Rescue Mode with Known Schema - Nested Structure Issues

Rescue mode with predefined schema excels at detecting JSON structure and type mismatches while preserving recoverable nested data.

# Configure Auto Loader with known schema and rescue mode for JSON
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaEvolutionMode", "rescue")  # Automatic nested data rescue
  .schema(schema)  # Predefined JSON schema for validation
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_1/")
  .trigger(availableNow=True)
  .start(path + "output_1/")
  .awaitTermination()
)

# Analyze rescued data - detects Missing Fields and Type Mismatches in nested JSON
df_rescued = spark.read.load(path + "output_1/")
rescued_records = df_rescued.filter(col("_rescued_data").isNotNull())

print(f"=== SCENARIO 1: JSON RESCUE MODE WITH KNOWN SCHEMA ===")
print(f"Issues detected: Extra nested fields, type mismatches in objects/arrays")
print(f"Rescued records: {rescued_records.count()}")

display(rescued_records.select("id", "product.name", "_rescued_data", "file_name"))

matta_1-1757057166624.png

Scenario 2: Corrupt Record Detection - JSON Syntax Failures

Corrupt record detection captures completely malformed JSON that cannot be parsed due to syntax errors.

# Configure Auto Loader with corrupt record detection for JSON syntax issues
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", path + "schema_2/")
  .option("cloudFiles.schemaHints", "_corrupt_record STRING")  # Enable JSON corruption capture
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_2/")
  .trigger(availableNow=True)
  .start(path + "output_2/")
  .awaitTermination()
)

# Analyze corrupt records - detects Invalid Syntax, Malformed Arrays, Invalid Escapes
df_corrupt = spark.read.load(path + "output_2/")
corrupt_records = df_corrupt.filter(col("_corrupt_record").isNotNull())

print(f"=== SCENARIO 2: JSON CORRUPT RECORD DETECTION ===")
print(f"Issues detected: Invalid JSON syntax, malformed arrays, escape sequence errors")
print(f"Corrupt records: {corrupt_records.count()}")

display(corrupt_records.select("_corrupt_record", "file_name"))

matta_2-1757057166624.png

Scenario 3: Combined Approach - Comprehensive JSON Quality Detection

Both rescue mode and corrupt record detection provide complete coverage for all JSON corruption scenarios.

# Enhanced schema including corrupt record field for comprehensive JSON validation
schema_with_corrupt = StructType([
    StructField("id", IntegerType(), True),
    StructField("product", StructType([
        StructField("name", StringType(), True),
        StructField("category", StringType(), True)
    ]), True),
    StructField("quantity", IntegerType(), True),
    StructField("tags", ArrayType(StringType()), True),
    StructField("_corrupt_record", StringType(), True)
])

# Configure Auto Loader with comprehensive JSON quality detection
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema(schema_with_corrupt)  # Complete schema with corruption handling
  .option("cloudFiles.schemaEvolutionMode", "rescue")        # Nested data rescue
  .option("cloudFiles.schemaHints", "_corrupt_record STRING") # Syntax error capture
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_3/")
  .trigger(availableNow=True)
  .start(path + "output_3/")
  .awaitTermination()
)

# Comprehensive JSON quality analysis - detects ALL five corruption scenarios
df_combined = spark.read.load(path + "output_3/")
all_issues = df_combined.filter(col("_rescued_data").isNotNull() | col("_corrupt_record").isNotNull())

print(f"=== SCENARIO 3: COMPREHENSIVE JSON QUALITY DETECTION ===")
print(f"Total JSON records processed: {df_combined.count()}")
print(f"Records with quality issues: {all_issues.count()}")
print("Complete JSON validation: All 5 corruption types detected\n")

# Show detailed breakdown by issue type and source file
print("--- Complete JSON Quality Analysis ---")
display(all_issues.select("id", "product.name", "_rescued_data", "_corrupt_record", "file_name")
        .orderBy("file_name"))

# JSON-specific detection summary
rescued_count = df_combined.filter(col("_rescued_data").isNotNull()).count()
corrupt_count = df_combined.filter(col("_corrupt_record").isNotNull()).count()
print(f"\n✓ Extra Fields & Type Mismatches → Rescued: {rescued_count}")
print(f"✓ Syntax Errors, Array/Escape Issues → Corrupt: {corrupt_count}")
print(f"✓ JSON quality coverage: {((rescued_count + corrupt_count) / df_combined.count() * 100):.1f}%")

matta_3-1757057166625.png

Implementation Strategy for JSON Quality Monitoring

To implement robust JSON quality detection:

  1. Start with rescue mode using cloudFiles.schemaEvolutionMode: "rescue" for automatic nested data recovery
  2. Add corrupt record detection via schema hints for JSON syntax error capture
  3. Define nested schemas that match your expected JSON structure for better validation
  4. Monitor both detection columns in data quality dashboards with file-level tracking
  5. Set up alerts for JSON corruption rate thresholds
  6. Implement downstream processing to handle each JSON corruption type appropriately

Conclusion

Auto Loader's advanced JSON quality detection capabilities make it indispensable for modern data lakes handling complex nested data. 

The combination of rescue mode and corrupt record detection provides comprehensive coverage for JSON-specific corruption scenarios, from syntax errors to nested structure mismatches.

What's Next?

Ready to implement these data quality patterns in your own pipelines? Start by testing the provided code examples in your Databricks environment and adapt them to your specific JSON schema requirements. 

For deeper implementation guidance, explore the AutoLoader Schema Evolution Documentation and JSON-specific AutoLoader Options. Additionally, stay tuned for our upcoming XML Data Ingestion with Built-in Data Audit and Validation blog post.