Databricks Auto Loader has become the standard for incremental data ingestion in cloud environments. It provides automatic schema inference, handles evolving data structures, and ensures exactly-once processing guarantees.
For comprehensive details on Auto Loader's capabilities, refer to the official Databricks Auto Loader documentation.
What makes Auto Loader exceptional is its built-in data quality and validation features, including sophisticated corrupt record handling that maintains pipeline reliability while preserving data integrity.
In real-world scenarios, you're often ingesting files from various upstream systems in multiple formats (CSV, JSON, XML, Parquet, etc.) that may contain corrupt data. This corruption can manifest as:
Traditional batch processing approaches often fail entire files due to corrupt records or silently skip corrupted data.
Auto Loader's robust corrupt data detection transforms this challenge into a manageable feature across all supported file formats.
Auto Loader provides comprehensive data validation across CSV, JSON, XML, Parquet, and other supported formats through:
This makes Auto Loader both an ingestion tool and a universal data audit solution that works consistently across file formats. Learn More: Configure schema inference and evolution in Auto Loader
Captures data that doesn't conform to the expected schema by extracting parseable portions while placing non-conforming elements in the rescuedData column. Automatically enabled when using cloudFiles.schemaEvolutionMode: "rescue" - no additional configuration required. Works seamlessly with schema inference across all supported formats. For detailed information, see Auto Loader Schema Evolution documentation. Ideal for maximizing data recovery.
Stores completely unparseable records in their entirety while setting other columns to null. Must be explicitly enabled via schema hints. Perfect for debugging and comprehensive corruption tracking across any file format.
Note: This analysis focuses on CSV format. Auto Loader provides similar corrupt data detection capabilities for JSON, XML, Parquet, and other supported formats.
import time
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Create unique path for experiment
path = f"/tmp/{int(time.time())}/"
input_path = path + "input/"
dbutils.fs.mkdirs(input_path)
# Expected schema for validation
schema = StructType([
StructField("id", IntegerType(), True),
StructField("product_name", StringType(), True),
StructField("quantity", IntegerType(), True)
])
# Five data quality scenarios:
# 1. DataType Mismatch - text in numeric field (Detectable via Rescue)
dataType_mismatch = """id,product_name,quantity
101,Mechanical Keyboard,25
102,Ergonomic Chair,N/A"""
# 2. Missing Header - no column names (Detectable via Rescue)
missing_header = """501,Webcam,20
502,USB-C Hub,15"""
# 3. Incorrect Delimiter - mixed delimiters (Detectable via Corrupt Record)
incorrect_delimiter = """id,product_name,quantity
301,Laptop Sleeve,40
302|Desk Mat|50"""
# 4. Additional Column - extra field (Detectable via Corrupt Record)
additional_column = """id,product_name,quantity
201,Mechanical Keyboard,25
202,Monitor Stand,30,Sold Out"""
# 5. Missing Column - incomplete record (Detectable via Corrupt Record)
missing_column = """id,product_name,quantity
401,Laptop Stand,35
402,Power Bank"""
# Write test files
files = {
"dataType_mismatch.csv": dataType_mismatch,
"additional_column.csv": additional_column,
"incorrect_delimiter.csv": incorrect_delimiter,
"missing_column.csv": missing_column,
"missing_header.csv": missing_header
}
for filename, content in files.items():
dbutils.fs.put(f"{input_path}{filename}", content, overwrite=True)
print(f"Created {len(files)} test files covering key data validation scenarios")
# Display the content of each file using dbutils.fs.head
print("--- Displaying File Contents ---")
print("DataType Mismatch:")
print(dbutils.fs.head(f"{input_path}dataType_mismatch.csv"))
print("Additional Column:")
print(dbutils.fs.head(f"{input_path}additional_column.csv"))
print("Incorrect Delimiter:")
print(dbutils.fs.head(f"{input_path}incorrect_delimiter.csv"))
print("Missing Column:")
print(dbutils.fs.head(f"{input_path}missing_column.csv"))
print("Missing Header:")
print(dbutils.fs.head(f"{input_path}missing_header.csv"))
When using rescue mode with a predefined schema, Auto Loader can detect data that doesn't conform to expected structure while preserving recoverable information.
# Configure Auto Loader with known schema and rescue mode
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("cloudFiles.schemaEvolutionMode", "rescue") # Enables automatic data rescue
.schema(schema) # Using predefined schema for validation
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_1/")
.trigger(availableNow=True)
.start(path + "output_1/")
.awaitTermination()
)
# Analyze rescued data - detects DataType Mismatches and Missing Headers
df_rescued = spark.read.load(path + "output_1/")
rescued_records = df_rescued.filter(col("_rescued_data").isNotNull())
print(f"=== SCENARIO 1: RESCUE MODE WITH KNOWN SCHEMA ===")
print(f"Issues detected: DataType Mismatch, Missing Header scenarios")
print(f"Rescued records: {rescued_records.count()}")
display(rescued_records.select("_rescued_data", "file_name"))
When using corrupt record detection without a predefined schema, Auto Loader relies on schema inference and detects completely unparseable records.
# Configure Auto Loader with corrupt record detection (no predefined schema)
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("cloudFiles.schemaLocation", path + "schema_2/")
.option("cloudFiles.schemaHints", "_corrupt_record STRING") # Enable corrupt record capture
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_2/")
.trigger(availableNow=True)
.start(path + "output_2/")
.awaitTermination()
)
# Analyze corrupt records - detects Incorrect Delimiter, Additional Column, Missing Column
df_corrupt = spark.read.load(path + "output_2/")
corrupt_records = df_corrupt.filter(col("_corrupt_record").isNotNull())
print(f"=== SCENARIO 2: CORRUPT RECORD WITHOUT SCHEMA ===")
print(f"Issues detected: Incorrect Delimiter, Additional Column, Missing Column scenarios")
print(f"Corrupt records: {corrupt_records.count()}")
display(corrupt_records.select("_corrupt_record", "file_name"))
Using both rescue mode and corrupt record detection with a known schema provides comprehensive coverage of all data quality issues.
# Enhanced schema including corrupt record field
schema_with_corrupt = StructType([
StructField("id", IntegerType(), True),
StructField("product_name", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("_corrupt_record", StringType(), True)
])
# Configure Auto Loader with both rescue mode and corrupt record detection
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.schema(schema_with_corrupt) # Known schema with corrupt record field
.option("cloudFiles.schemaEvolutionMode", "rescue") # Auto rescue mode
.option("cloudFiles.schemaHints", "_corrupt_record STRING") # Explicit corrupt detection
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_3/")
.trigger(availableNow=True)
.start(path + "output_3/")
.awaitTermination()
)
# Comprehensive analysis - detects ALL five scenarios
df_combined = spark.read.load(path + "output_3/")
all_issues = df_combined.filter(col("_rescued_data").isNotNull() | col("_corrupt_record").isNotNull())
print(f"=== SCENARIO 3: ROBUST DETECTION WITH COMBINED APPROACH ===")
print(f"Total records processed: {df_combined.count()}")
print(f"Records with data quality issues: {all_issues.count()}")
print("Comprehensive detection: All 5 scenarios covered\n")
# Show detailed breakdown by issue type and source
print("--- Complete Data Quality Analysis ---")
display(all_issues.select("id", "product_name", "_rescued_data", "_corrupt_record", "file_name")
.orderBy("file_name"))
# Detection summary
rescued_count = df_combined.filter(col("_rescued_data").isNotNull()).count()
corrupt_count = df_combined.filter(col("_corrupt_record").isNotNull()).count()
print(f"\n✓ DataType Mismatch & Missing Header → Rescued: {rescued_count}")
print(f"✓ Incorrect Delimiter, Additional & Missing Columns → Corrupt: {corrupt_count}")
print(f"✓ Total coverage: {((rescued_count + corrupt_count) / df_combined.count() * 100):.1f}%")
To leverage both detection strategies:
Auto Loader's corrupt record handling makes it a comprehensive data quality solution. Choose your approach based on requirements:
With Auto Loader, you're building a foundation for data trust that scales with your organization's needs.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.