cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
matta
Databricks Employee
Databricks Employee

Introduction: Auto Loader as the Go-To Ingestion Solution

Databricks Auto Loader has become the standard for incremental data ingestion in cloud environments. It provides automatic schema inference, handles evolving data structures, and ensures exactly-once processing guarantees. 

For comprehensive details on Auto Loader's capabilities, refer to the official Databricks Auto Loader documentation.

What makes Auto Loader exceptional is its built-in data quality and validation features, including sophisticated corrupt record handling that maintains pipeline reliability while preserving data integrity.

The Use Case: Detecting Corrupt Data Across All Formats

In real-world scenarios, you're often ingesting files from various upstream systems in multiple formats (CSV, JSON, XML, Parquet, etc.) that may contain corrupt data. This corruption can manifest as:

  • Malformed structure that cannot be parsed
  • Data type mismatches (text in numeric fields)
  • Missing required fields or values
  • Invalid formatting or encoding issues

Traditional batch processing approaches often fail entire files due to corrupt records or silently skip corrupted data. 

Auto Loader's robust corrupt data detection transforms this challenge into a manageable feature across all supported file formats.

Auto Loader: A One-Stop File Audit and Validation Solution

Auto Loader provides comprehensive data validation across CSV, JSON, XML, Parquet, and other supported formats through:

  • Automatic Schema Inference: Adapts to changing data structures.
  • Corrupt Record Isolation: Separates bad records without stopping pipelines
  • Built-in Rescue Mode: Automatically captures non-conforming data with schemaEvolutionMode: "rescue".
  • No Pipeline Restarts: Handles schema evolution and data quality issues without interrupting ingestion
  • File-Level Tracking: Uses _metadata.file_name to trace corrupt records back to source files for precise audit trails
  • Flexible Processing Modes: Switch seamlessly between batch processing (availableNow=True) and real-time streaming (processingTime) without code changes
  • Comprehensive Metadata: Tracks processing statistics and error details
  • Flexible Handling Options: Multiple approaches for dealing with corrupt data

This makes Auto Loader both an ingestion tool and a universal data audit solution that works consistently across file formats. Learn More: Configure schema inference and evolution in Auto Loader

Understanding the Two Key Options

1. rescuedData Column

Captures data that doesn't conform to the expected schema by extracting parseable portions while placing non-conforming elements in the rescuedData column. Automatically enabled when using cloudFiles.schemaEvolutionMode: "rescue" - no additional configuration required. Works seamlessly with schema inference across all supported formats. For detailed information, see Auto Loader Schema Evolution documentation. Ideal for maximizing data recovery.

2. _corrupt_record Column

Stores completely unparseable records in their entirety while setting other columns to null. Must be explicitly enabled via schema hints. Perfect for debugging and comprehensive corruption tracking across any file format.

Practical Examples with PySpark (CSV Focus)

Note: This analysis focuses on CSV format. Auto Loader provides similar corrupt data detection capabilities for JSON, XML, Parquet, and other supported formats.

Scenario Preparation: Five Key Data Validation Scenarios

import time
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Create unique path for experiment
path = f"/tmp/{int(time.time())}/"
input_path = path + "input/"
dbutils.fs.mkdirs(input_path)

# Expected schema for validation
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("quantity", IntegerType(), True)
])

# Five data quality scenarios:

# 1. DataType Mismatch - text in numeric field (Detectable via Rescue)
dataType_mismatch = """id,product_name,quantity
101,Mechanical Keyboard,25
102,Ergonomic Chair,N/A"""

# 2. Missing Header - no column names (Detectable via Rescue) 
missing_header = """501,Webcam,20
502,USB-C Hub,15"""

# 3. Incorrect Delimiter - mixed delimiters (Detectable via Corrupt Record)
incorrect_delimiter = """id,product_name,quantity
301,Laptop Sleeve,40
302|Desk Mat|50"""

# 4. Additional Column - extra field (Detectable via Corrupt Record)
additional_column = """id,product_name,quantity
201,Mechanical Keyboard,25
202,Monitor Stand,30,Sold Out"""

# 5. Missing Column - incomplete record (Detectable via Corrupt Record)
missing_column = """id,product_name,quantity
401,Laptop Stand,35
402,Power Bank"""

# Write test files
files = {
    "dataType_mismatch.csv": dataType_mismatch,
    "additional_column.csv": additional_column, 
    "incorrect_delimiter.csv": incorrect_delimiter,
    "missing_column.csv": missing_column,
    "missing_header.csv": missing_header
}

for filename, content in files.items():
    dbutils.fs.put(f"{input_path}{filename}", content, overwrite=True)
    
print(f"Created {len(files)} test files covering key data validation scenarios")

Scenario Preparation: Output

Screenshot 2025-08-16 at 3.17.54 PM.png

Scenario Preparation: Display Output File Contents

# Display the content of each file using dbutils.fs.head
print("--- Displaying File Contents ---")
print("DataType Mismatch:")
print(dbutils.fs.head(f"{input_path}dataType_mismatch.csv"))

print("Additional Column:")
print(dbutils.fs.head(f"{input_path}additional_column.csv"))

print("Incorrect Delimiter:")
print(dbutils.fs.head(f"{input_path}incorrect_delimiter.csv"))

print("Missing Column:")
print(dbutils.fs.head(f"{input_path}missing_column.csv"))

print("Missing Header:")
print(dbutils.fs.head(f"{input_path}missing_header.csv"))

Screenshot 2025-08-22 at 12.44.16 AM.png

Scenario 1: Rescue Mode with Known Schema - Targeted Detection

When using rescue mode with a predefined schema, Auto Loader can detect data that doesn't conform to expected structure while preserving recoverable information.

# Configure Auto Loader with known schema and rescue mode
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("cloudFiles.schemaEvolutionMode", "rescue")  # Enables automatic data rescue
  .schema(schema)  # Using predefined schema for validation
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_1/")
  .trigger(availableNow=True)
  .start(path + "output_1/")
  .awaitTermination()
)

# Analyze rescued data - detects DataType Mismatches and Missing Headers
df_rescued = spark.read.load(path + "output_1/")
rescued_records = df_rescued.filter(col("_rescued_data").isNotNull())

print(f"=== SCENARIO 1: RESCUE MODE WITH KNOWN SCHEMA ===")
print(f"Issues detected: DataType Mismatch, Missing Header scenarios")
print(f"Rescued records: {rescued_records.count()}")

display(rescued_records.select("_rescued_data", "file_name"))

Scenario 1: Output

Screenshot 2025-08-16 at 3.09.18 PM.png

Scenario 2: Corrupt Record Detection without Schema - Structural Issues

When using corrupt record detection without a predefined schema, Auto Loader relies on schema inference and detects completely unparseable records.

# Configure Auto Loader with corrupt record detection (no predefined schema)
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("cloudFiles.schemaLocation", path + "schema_2/")
  .option("cloudFiles.schemaHints", "_corrupt_record STRING")  # Enable corrupt record capture
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_2/")
  .trigger(availableNow=True)
  .start(path + "output_2/")
  .awaitTermination()
)

# Analyze corrupt records - detects Incorrect Delimiter, Additional Column, Missing Column
df_corrupt = spark.read.load(path + "output_2/")
corrupt_records = df_corrupt.filter(col("_corrupt_record").isNotNull())

print(f"=== SCENARIO 2: CORRUPT RECORD WITHOUT SCHEMA ===")
print(f"Issues detected: Incorrect Delimiter, Additional Column, Missing Column scenarios")
print(f"Corrupt records: {corrupt_records.count()}")

display(corrupt_records.select("_corrupt_record", "file_name"))

Scenario 2: Output

Screenshot 2025-08-16 at 3.11.03 PM.png

Scenario 3: Combined Approach - Robust Detection of All Scenarios

Using both rescue mode and corrupt record detection with a known schema provides comprehensive coverage of all data quality issues.

# Enhanced schema including corrupt record field
schema_with_corrupt = StructType([
    StructField("id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("_corrupt_record", StringType(), True)
])

# Configure Auto Loader with both rescue mode and corrupt record detection
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .schema(schema_with_corrupt)  # Known schema with corrupt record field
  .option("cloudFiles.schemaEvolutionMode", "rescue")        # Auto rescue mode
  .option("cloudFiles.schemaHints", "_corrupt_record STRING") # Explicit corrupt detection
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_3/")
  .trigger(availableNow=True)
  .start(path + "output_3/")
  .awaitTermination()
)

# Comprehensive analysis - detects ALL five scenarios
df_combined = spark.read.load(path + "output_3/")
all_issues = df_combined.filter(col("_rescued_data").isNotNull() | col("_corrupt_record").isNotNull())

print(f"=== SCENARIO 3: ROBUST DETECTION WITH COMBINED APPROACH ===")
print(f"Total records processed: {df_combined.count()}")
print(f"Records with data quality issues: {all_issues.count()}")
print("Comprehensive detection: All 5 scenarios covered\n")

# Show detailed breakdown by issue type and source
print("--- Complete Data Quality Analysis ---")
display(all_issues.select("id", "product_name", "_rescued_data", "_corrupt_record", "file_name")
        .orderBy("file_name"))

# Detection summary
rescued_count = df_combined.filter(col("_rescued_data").isNotNull()).count()
corrupt_count = df_combined.filter(col("_corrupt_record").isNotNull()).count()
print(f"\n✓ DataType Mismatch & Missing Header → Rescued: {rescued_count}")
print(f"✓ Incorrect Delimiter, Additional & Missing Columns → Corrupt: {corrupt_count}")
print(f"✓ Total coverage: {((rescued_count + corrupt_count) / df_combined.count() * 100):.1f}%")

Scenario 3: Output

Screenshot 2025-08-16 at 3.18.22 PM.png

Achieving Both Approaches: Implementation Strategy

To leverage both detection strategies:

  1. Use rescue mode by default with cloudFiles.schemaEvolutionMode: "rescue" for automatic data recovery
  2. Add corrupt record schema hints to explicitly enable _corrupt_record column when needed
  3. Monitor both columns in your data quality dashboards
  4. Set up alerts when corruption rates exceed acceptable thresholds
  5. Implement downstream processing to handle each corruption type appropriately

Conclusion

Auto Loader's corrupt record handling makes it a comprehensive data quality solution. Choose your approach based on requirements:

  • rescuedData for maximum data recovery
  • _corrupt_record for complete corruption tracking
  • Both for comprehensive data quality monitoring

With Auto Loader, you're building a foundation for data trust that scales with your organization's needs.