This blog demonstrates AutoLoader's advanced data quality detection capabilities for XML files:
In our previous blog posts "CSV Data Ingestion with Built-in Data Audit & Validation" and "JSON Data Ingestion with Built-in Data Audit & Validation", we explored AutoLoader's core capabilities for reliable data ingestion. Today, we dive deeper into AutoLoader's advanced data quality features, specifically focusing on XML file corruption detection and validation.
Consider a standard shipping manifest with the expected XML schema:
<ShipmentManifest id="SH-78901">
<ShippingDate>2025-09-12</ShippingDate>
<Carrier>Express ‘&’ Freight</Carrier>
<Items>
<Item SKU="HW-101">
<Description>Industrial Grade Widget</Description>
<Quantity>150</Quantity>
</Item>
</Items>
</ShipmentManifest>
XML files from various upstream systems frequently introduce structural and data quality problems that disrupt traditional ingestion pipelines. Below are some common corruption patterns we'll address:
AutoLoader's built-in quality detection capabilities handle these challenges gracefully, with XML requiring special attention due to its hierarchical structure and strict syntax requirements.
AutoLoader provides comprehensive XML validation through:
This makes it ideal for XML-heavy data lakes where schema flexibility and data quality monitoring are critical.
The following examples demonstrate XML-specific corruption scenarios. AutoLoader provides similar capabilities for CSV, JSON, Parquet, and other formats.
The following examples demonstrate AutoLoader's ability to handle various XML corruption scenarios using realistic shipping manifest data. We'll create test files with different types of data quality issues that mirror real-world production challenges.
import time
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
# Create a single input path for all files
path = f"/tmp/autoloader_xml_scenarios/{int(time.time())}/"
input_path = path + "input/"
dbutils.fs.mkdirs(input_path)
# 1. Malformed Syntax (Structural Error -> Corrupt)
# Missing a closing </Item> tag.
malformed_syntax = """
<ShipmentManifest id="SH-78901">
<ShippingDate>2025-09-12</ShippingDate>
<Carrier>Express Freight</Carrier>
<Items>
<Item SKU="SW-202">
<Description>Analytics Software License</Description>
<Quantity>10</Quantity>
</Items>
</ShipmentManifest>
"""
# 2. Invalid Nesting (Structural Error -> Corrupt)
# The </Item> tag is outside the <Items> container.
invalid_nesting = """
<ShipmentManifest id="SH-78902">
<ShippingDate>2025-09-12</ShippingDate>
<Carrier>Standard Logistics</Carrier>
<Items>
<Item SKU="HW-303">
<Description>Reinforced Gearbox</Description>
<Quantity>20</Quantity>
</Items></Item>
</ShipmentManifest>
"""
# 3. Unescaped Special Characters (Data Quality Error -> Rescue)
# The '&' character should be escaped as '&'.
unescaped_char = """
<ShipmentManifest id="SH-78903">
<ShippingDate>2025-09-13</ShippingDate>
<Carrier>ACME Shipping & Logistics</Carrier>
<Items>
<Item SKU="HW-404">
<Description>Hydraulic Press</Description>
<Quantity>1</Quantity>
</Item>
</Items>
</ShipmentManifest>
"""
# 4. Data Type Mismatch (Data Quality Error -> Rescued)
# 'Quantity' is 'Fifty' but the schema expects an integer.
datatype_mismatch = """
<ShipmentManifest id="SH-78904">
<ShippingDate>2025-09-14</ShippingDate>
<Carrier>Next Day Air</Carrier>
<Items>
<Item SKU="HW-505">
<Description>Precision Bearing Set</Description>
<Quantity>Fifty</Quantity>
</Item>
</Items>
</ShipmentManifest>
"""
# 5. Unexpected Attribute (Schema Drift -> Rescued)
# The 'status' attribute is not defined in the schema.
unexpected_attribute = """
<ShipmentManifest id="SH-78905">
<ShippingDate>2025-09-15</ShippingDate>
<Carrier>Global Transport</Carrier>
<Items>
<Item SKU="SW-606" status="backordered">
<Description>Firmware Update Key</Description>
<Quantity>200</Quantity>
</Item>
</Items>
</ShipmentManifest>
"""
# 6. Mismatched Closing Tag (Structural Error -> Corrupt)
# The <Description> tag is incorrectly closed with </Desc>.
mismatched_tag = """
<ShipmentManifest id="SH-98765">
<ShippingDate>2025-09-12</ShippingDate>
<Carrier>Rapid Couriers</Carrier>
<Items>
<Item SKU="HW-707">
<Description>Ergonomic Mouse</Desc>
<Quantity>50</Quantity>
</Item>
</Items>
</ShipmentManifest>
"""
# 7. Attribute Value Without Quotes (Structural Error -> Corrupt)
# The SKU attribute's value is not enclosed in quotes.
unquoted_attribute = """
<ShipmentManifest id="SH-98766">
<ShippingDate>2025-09-13</ShippingDate>
<Carrier>Metro Deliveries</Carrier>
<Items>
<Item SKU=HW-808>
<Description>Noise-Cancelling Headphones</Description>
<Quantity>25</Quantity>
</Item>
</Items>
</ShipmentManifest>
"""
# Write all test files to the input directory
files = {
"1_malformed_syntax.xml": malformed_syntax,
"2_invalid_nesting.xml": invalid_nesting,
"3_unescaped_char.xml": unescaped_char,
"4_datatype_mismatch.xml": datatype_mismatch,
"5_unexpected_attribute.xml": unexpected_attribute,
"6_mismatched_tag.xml": mismatched_tag,
"7_unquoted_attribute.xml": unquoted_attribute
}
for filename, content in files.items():
dbutils.fs.put(f"{input_path}{filename}", content, overwrite=True)
print(f"Created {len(files)} XML test files in: {input_path}")
Rescue mode detects schema deviations but misses structural parsing errors.
# Define the strict schema, making key fields non-nullable to enforce validation
schema_rescue_only = StructType([
StructField("_id", StringType(), True),
StructField("ShippingDate", StringType(), True),
StructField("Carrier", StringType(), True),
StructField("Items", StructType([
StructField("Item", ArrayType(StructType([
StructField("_SKU", StringType(), True),
StructField("Description", StringType(), True),
StructField("Quantity", IntegerType(), False) # Non-nullable
])), True)
]), True)
])
# Configure AutoLoader for RESCUE mode only
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml").option("rowTag", "ShipmentManifest")
.option("cloudFiles.schemaEvolutionMode", "rescue") # Captures schema mismatches
.schema(schema_rescue_only) # Enforce a strict schema
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_1/")
.trigger(availableNow=True)
.start(path + "output_1/")
.awaitTermination()
)
# Analyze the rescued records
df_rescued = spark.read.load(path + "output_1/")
rescued_records = df_rescued.filter(col("_rescued_data").isNotNull())
print(f"=== SCENARIO 1: XML RESCUE MODE WITH STRICT SCHEMA ===")
print(f"Rescued records: {rescued_records.count()}")
display(rescued_records.select("_rescued_data", "file_name"))
Expected Output: This only shows files with data quality issues (datatype_mismatch.xml with "Fifty," unexpected_attribute.xml with "status" attribute, and unescaped_char.xml with parsing issues that result in data recovery).
Corrupt record mode catches parsing failures but misses data quality issues.
# Configure AutoLoader with corrupt record detection for XML syntax issues
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml").option("rowTag", "ShipmentManifest")
.option("cloudFiles.schemaLocation", path + "schema_2/")
.option("cloudFiles.schemaHints", "_corrupt_record STRING") # Enable XML corruption capture
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_2/")
.trigger(availableNow=True)
.start(path + "output_2/")
.awaitTermination()
)
# Analyze the corrupt records
df_corrupt = spark.read.load(path + "output_2/")
corrupt_records = df_corrupt.filter(col("_corrupt_record").isNotNull())
print(f"=== SCENARIO 2: XML CORRUPT RECORD DETECTION ===")
print(f"Corrupt records: {corrupt_records.count()}")
display(corrupt_records.select("_corrupt_record", "file_name"))
Expected Output: Only shows files with structural XML parsing errors (malformed_syntax.xml, invalid_nesting.xml, mismatched_tag.xml, unquoted_attribute.xml).
Single configuration correctly identifies and categorizes all XML error types.
# Define a schema that includes the non-nullable fields AND the _corrupt_record column
schema_comprehensive = StructType([
StructField("_id", StringType(), True),
StructField("ShippingDate", StringType(), True),
StructField("Carrier", StringType(), True),
StructField("Items", StructType([
StructField("Item", ArrayType(StructType([
StructField("_SKU", StringType(), True),
StructField("Description", StringType(), True),
StructField("Quantity", IntegerType(), False)
])), True)
]), True),
StructField("_corrupt_record", StringType(), True) # Explicitly define for capture
])
# Configure AutoLoader for comprehensive quality detection
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml").option("rowTag", "ShipmentManifest")
.schema(schema_comprehensive) # Apply complete schema with corruption handling
.option("cloudFiles.schemaEvolutionMode", "rescue") # Handle data quality & schema drift
.load(input_path)
.select("*", "_metadata.*")
.writeStream
.format("delta")
.option("checkpointLocation", path + "checkpoint_3/")
.trigger(availableNow=True)
.start(path + "output_3/")
.awaitTermination()
)
# Comprehensive XML quality analysis - detects ALL seven corruption scenarios
df_combined = spark.read.load(path + "output_3/")
all_issues = df_combined.filter(col("_rescued_data").isNotNull() | col("_corrupt_record").isNotNull())
print(f"=== SCENARIO 3: COMPREHENSIVE XML QUALITY DETECTION ===")
print(f"Total XML records processed: {df_combined.count()}")
print(f"Records with quality issues: {all_issues.count()}")
print("Complete XML validation: All 7 corruption types detected\n")
# Show detailed breakdown by issue type and source file
print("--- Complete XML Quality Analysis ---")
display(all_issues.select("_rescued_data", "_corrupt_record", "file_name")
.orderBy("file_name"))
# XML-specific detection summary
rescued_count = df_combined.filter(col("_rescued_data").isNotNull()).count()
corrupt_count = df_combined.filter(col("_corrupt_record").isNotNull()).count()
print(f"\n✓ Data Quality & Schema Drift → Rescued: {rescued_count}")
print(f"✓ Structural & Parsing Errors → Corrupt: {corrupt_count}")
print(f"✓ XML quality coverage: {((rescued_count + corrupt_count) / df_combined.count() * 100):.1f}%")
To implement robust XML quality detection:
AutoLoader's advanced XML quality detection capabilities make it indispensable for modern data lakes handling complex hierarchical data.
The combination of rescue mode and corrupt record detection provides comprehensive coverage for XML-specific corruption scenarios, from syntax errors to schema violations.
By implementing the dual-detection approach demonstrated in this blog, you can ensure that your XML data ingestion pipelines are both resilient and auditable, capturing every type of data quality issue while maintaining operational continuity.
Ready to implement these data quality patterns in your own pipelines? Start by testing the provided code examples in your Databricks environment and adapt them to your specific XML schema requirements.
For deeper implementation guidance, explore the AutoLoader Schema Evolution Documentation and Scheme Inference and XML-specific AutoLoader Options
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.