cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
matta
Databricks Employee
Databricks Employee

Summary

This blog demonstrates AutoLoader's advanced data quality detection capabilities for XML files:

  • Common XML Corruption Patterns: Handle malformed XML syntax, invalid nesting, unescaped characters, data type mismatches, unexpected attributes, mismatched tags, and unquoted attributes
  • Practical Implementation: Working PySpark code with realistic shipping manifest examples, achieving comprehensive quality issue coverage
  • Production Strategy: Use _rescued_data for data continuity, add _corrupt_record for audit trails and debugging

Introduction: Building on AutoLoader Fundamentals

In our previous blog posts "CSV Data Ingestion with Built-in Data Audit & Validation" and "JSON Data Ingestion with Built-in Data Audit & Validation", we explored AutoLoader's core capabilities for reliable data ingestion. Today, we dive deeper into AutoLoader's advanced data quality features, specifically focusing on XML file corruption detection and validation.

The Challenge: XML Data Quality in Production

Consider a standard shipping manifest with the expected XML schema:

<ShipmentManifest id="SH-78901">
  <ShippingDate>2025-09-12</ShippingDate>
  <Carrier>Express ‘&amp’ Freight</Carrier>
  <Items>
    <Item SKU="HW-101">
      <Description>Industrial Grade Widget</Description>
      <Quantity>150</Quantity>
    </Item>
  </Items>
</ShipmentManifest>

XML files from various upstream systems frequently introduce structural and data quality problems that disrupt traditional ingestion pipelines. Below are some common corruption patterns we'll address:

  1. Malformed XML Syntax: Missing closing tags prevent parsing. Example, </Items> not present
  2. Unescaped Characters: Special characters break XML parsing. Example, Express & Freight, the '&' character should be escaped as '&amp’ i.e. Express ‘&amp’ Freight
  3. Data Type Mismatches: Wrong types violate schema expectations.  Example, <Quantity>Fifty</Quantity> instead of number <Quantity>50</Quantity>
  4. Unexpected Attributes: Additional properties cause schema drift. Example,  <Item SKU="SW-606" status="backordered"> instead of <Item SKU="SW-606>
  5. Mismatched Closing Tags: Incorrect tag names Example, <Description> closed with </Desc> instead of </Description>
  6. Unquoted Attribute Values: Missing quotes around attribute values. Example, SKU=HW-808 instead of SKU=”HW-808”

AutoLoader's built-in quality detection capabilities handle these challenges gracefully, with XML requiring special attention due to its hierarchical structure and strict syntax requirements.

AutoLoader: Your XML Data Quality Safety Net

AutoLoader provides comprehensive XML validation through:

  • Automatic Schema Inference: Dynamically adapts to XML structure variations
  • Corrupt Record Isolation: Separates unparseable records for debugging
  • Built-in Rescue Mode: Preserves partial data from schema mismatches
  • No Pipeline Restarts: Continues processing despite quality issues
  • Element-Level Tracking: Tracks issues via _metadata.file_name
  • Hierarchical Structure Support: Handles complex nested XML elements

This makes it ideal for XML-heavy data lakes where schema flexibility and data quality monitoring are critical.

Practical Examples with XML Files

The following examples demonstrate XML-specific corruption scenarios. AutoLoader provides similar capabilities for CSV, JSON, Parquet, and other formats.

Scenario Preparation: XML Shipping Manifest Test Cases

The following examples demonstrate AutoLoader's ability to handle various XML corruption scenarios using realistic shipping manifest data. We'll create test files with different types of data quality issues that mirror real-world production challenges.

import time
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType


# Create a single input path for all files
path = f"/tmp/autoloader_xml_scenarios/{int(time.time())}/"
input_path = path + "input/"
dbutils.fs.mkdirs(input_path)


# 1. Malformed Syntax (Structural Error -> Corrupt)
# Missing a closing </Item> tag.
malformed_syntax = """
<ShipmentManifest id="SH-78901">
 <ShippingDate>2025-09-12</ShippingDate>
 <Carrier>Express Freight</Carrier>
 <Items>
   <Item SKU="SW-202">
     <Description>Analytics Software License</Description>
     <Quantity>10</Quantity>
   </Items>
</ShipmentManifest>
"""


# 2. Invalid Nesting (Structural Error -> Corrupt)
# The </Item> tag is outside the <Items> container.
invalid_nesting = """
<ShipmentManifest id="SH-78902">
 <ShippingDate>2025-09-12</ShippingDate>
 <Carrier>Standard Logistics</Carrier>
 <Items>
   <Item SKU="HW-303">
     <Description>Reinforced Gearbox</Description>
     <Quantity>20</Quantity>
 </Items></Item>
</ShipmentManifest>
"""


# 3. Unescaped Special Characters (Data Quality Error -> Rescue)
# The '&' character should be escaped as '&amp;'.
unescaped_char = """
<ShipmentManifest id="SH-78903">
 <ShippingDate>2025-09-13</ShippingDate>
 <Carrier>ACME Shipping & Logistics</Carrier>
 <Items>
   <Item SKU="HW-404">
     <Description>Hydraulic Press</Description>
     <Quantity>1</Quantity>
   </Item>
 </Items>
</ShipmentManifest>
"""


# 4. Data Type Mismatch (Data Quality Error -> Rescued)
# 'Quantity' is 'Fifty' but the schema expects an integer.
datatype_mismatch = """
<ShipmentManifest id="SH-78904">
 <ShippingDate>2025-09-14</ShippingDate>
 <Carrier>Next Day Air</Carrier>
 <Items>
   <Item SKU="HW-505">
     <Description>Precision Bearing Set</Description>
     <Quantity>Fifty</Quantity>
   </Item>
 </Items>
</ShipmentManifest>
"""


# 5. Unexpected Attribute (Schema Drift -> Rescued)
# The 'status' attribute is not defined in the schema.
unexpected_attribute = """
<ShipmentManifest id="SH-78905">
 <ShippingDate>2025-09-15</ShippingDate>
 <Carrier>Global Transport</Carrier>
 <Items>
   <Item SKU="SW-606" status="backordered">
     <Description>Firmware Update Key</Description>
     <Quantity>200</Quantity>
   </Item>
 </Items>
</ShipmentManifest>
"""


# 6. Mismatched Closing Tag (Structural Error -> Corrupt)
# The <Description> tag is incorrectly closed with </Desc>.
mismatched_tag = """
<ShipmentManifest id="SH-98765">
 <ShippingDate>2025-09-12</ShippingDate>
 <Carrier>Rapid Couriers</Carrier>
 <Items>
   <Item SKU="HW-707">
     <Description>Ergonomic Mouse</Desc>
     <Quantity>50</Quantity>
   </Item>
 </Items>
</ShipmentManifest>
"""


# 7. Attribute Value Without Quotes (Structural Error -> Corrupt)
# The SKU attribute's value is not enclosed in quotes.
unquoted_attribute = """
<ShipmentManifest id="SH-98766">
 <ShippingDate>2025-09-13</ShippingDate>
 <Carrier>Metro Deliveries</Carrier>
 <Items>
   <Item SKU=HW-808>
     <Description>Noise-Cancelling Headphones</Description>
     <Quantity>25</Quantity>
   </Item>
 </Items>
</ShipmentManifest>
"""


# Write all test files to the input directory
files = {
   "1_malformed_syntax.xml": malformed_syntax,
   "2_invalid_nesting.xml": invalid_nesting,
   "3_unescaped_char.xml": unescaped_char,
   "4_datatype_mismatch.xml": datatype_mismatch,
   "5_unexpected_attribute.xml": unexpected_attribute,
   "6_mismatched_tag.xml": mismatched_tag,
   "7_unquoted_attribute.xml": unquoted_attribute
}


for filename, content in files.items():
   dbutils.fs.put(f"{input_path}{filename}", content, overwrite=True)


print(f"Created {len(files)} XML test files in: {input_path}")

matta_0-1759486750838.png

Scenario 1: Rescue Mode - Data Quality Issues

Rescue mode detects schema deviations but misses structural parsing errors.

# Define the strict schema, making key fields non-nullable to enforce validation
schema_rescue_only = StructType([
    StructField("_id", StringType(), True),
    StructField("ShippingDate", StringType(), True),
    StructField("Carrier", StringType(), True),
    StructField("Items", StructType([
        StructField("Item", ArrayType(StructType([
            StructField("_SKU", StringType(), True),
            StructField("Description", StringType(), True),
            StructField("Quantity", IntegerType(), False) # Non-nullable
        ])), True)
    ]), True)
])

# Configure AutoLoader for RESCUE mode only
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml").option("rowTag", "ShipmentManifest")
  .option("cloudFiles.schemaEvolutionMode", "rescue") # Captures schema mismatches
  .schema(schema_rescue_only) # Enforce a strict schema
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_1/")
  .trigger(availableNow=True)
  .start(path + "output_1/")
  .awaitTermination()
)

# Analyze the rescued records
df_rescued = spark.read.load(path + "output_1/")
rescued_records = df_rescued.filter(col("_rescued_data").isNotNull())

print(f"=== SCENARIO 1: XML RESCUE MODE WITH STRICT SCHEMA ===")
print(f"Rescued records: {rescued_records.count()}")
display(rescued_records.select("_rescued_data", "file_name"))

Expected Output: This only shows files with data quality issues (datatype_mismatch.xml with "Fifty," unexpected_attribute.xml with "status" attribute, and unescaped_char.xml with parsing issues that result in data recovery).

matta_1-1759486750839.png

Scenario 2: Corrupt Record Detection - Structural Failures

Corrupt record mode catches parsing failures but misses data quality issues.

# Configure AutoLoader with corrupt record detection for XML syntax issues
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml").option("rowTag", "ShipmentManifest")
  .option("cloudFiles.schemaLocation", path + "schema_2/")
  .option("cloudFiles.schemaHints", "_corrupt_record STRING") # Enable XML corruption capture
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_2/")
  .trigger(availableNow=True)
  .start(path + "output_2/")
  .awaitTermination()
)

# Analyze the corrupt records
df_corrupt = spark.read.load(path + "output_2/")
corrupt_records = df_corrupt.filter(col("_corrupt_record").isNotNull())

print(f"=== SCENARIO 2: XML CORRUPT RECORD DETECTION ===")
print(f"Corrupt records: {corrupt_records.count()}")
display(corrupt_records.select("_corrupt_record", "file_name"))

Expected Output: Only shows files with structural XML parsing errors (malformed_syntax.xml, invalid_nesting.xml, mismatched_tag.xml, unquoted_attribute.xml).

matta_2-1759486750839.png

Scenario 3: Combined Approach - Comprehensive XML Quality Detection

Single configuration correctly identifies and categorizes all XML error types.

# Define a schema that includes the non-nullable fields AND the _corrupt_record column
schema_comprehensive = StructType([
    StructField("_id", StringType(), True),
    StructField("ShippingDate", StringType(), True),
    StructField("Carrier", StringType(), True),
    StructField("Items", StructType([
        StructField("Item", ArrayType(StructType([
            StructField("_SKU", StringType(), True),
            StructField("Description", StringType(), True),
            StructField("Quantity", IntegerType(), False)
        ])), True)
    ]), True),
    StructField("_corrupt_record", StringType(), True) # Explicitly define for capture
])

# Configure AutoLoader for comprehensive quality detection
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml").option("rowTag", "ShipmentManifest")
  .schema(schema_comprehensive) # Apply complete schema with corruption handling
  .option("cloudFiles.schemaEvolutionMode", "rescue") # Handle data quality & schema drift
  .load(input_path)
  .select("*", "_metadata.*")
  .writeStream
  .format("delta")
  .option("checkpointLocation", path + "checkpoint_3/")
  .trigger(availableNow=True)
  .start(path + "output_3/")
  .awaitTermination()
)

# Comprehensive XML quality analysis - detects ALL seven corruption scenarios
df_combined = spark.read.load(path + "output_3/")
all_issues = df_combined.filter(col("_rescued_data").isNotNull() | col("_corrupt_record").isNotNull())

print(f"=== SCENARIO 3: COMPREHENSIVE XML QUALITY DETECTION ===")
print(f"Total XML records processed: {df_combined.count()}")
print(f"Records with quality issues: {all_issues.count()}")
print("Complete XML validation: All 7 corruption types detected\n")

# Show detailed breakdown by issue type and source file
print("--- Complete XML Quality Analysis ---")
display(all_issues.select("_rescued_data", "_corrupt_record", "file_name")
        .orderBy("file_name"))

# XML-specific detection summary
rescued_count = df_combined.filter(col("_rescued_data").isNotNull()).count()
corrupt_count = df_combined.filter(col("_corrupt_record").isNotNull()).count()
print(f"\n✓ Data Quality & Schema Drift → Rescued: {rescued_count}")
print(f"✓ Structural & Parsing Errors → Corrupt: {corrupt_count}")
print(f"✓ XML quality coverage: {((rescued_count + corrupt_count) / df_combined.count() * 100):.1f}%")

matta_3-1759486750840.png

Implementation Strategy for XML Quality Monitoring

To implement robust XML quality detection:

  1. Define clear XML schemas with non-nullable fields for critical data validation
  2. Use rescue mode by default for automatic schema drift and data type issue handling
  3. Add corrupt record detection for comprehensive structural error capture
  4. Monitor both columns for complete quality visibility
  5. Implement file-level tracking for precise issue resolution

Conclusion

AutoLoader's advanced XML quality detection capabilities make it indispensable for modern data lakes handling complex hierarchical data. 

The combination of rescue mode and corrupt record detection provides comprehensive coverage for XML-specific corruption scenarios, from syntax errors to schema violations.

By implementing the dual-detection approach demonstrated in this blog, you can ensure that your XML data ingestion pipelines are both resilient and auditable, capturing every type of data quality issue while maintaining operational continuity.

What's Next?

Ready to implement these data quality patterns in your own pipelines? Start by testing the provided code examples in your Databricks environment and adapt them to your specific XML schema requirements. 

For deeper implementation guidance, explore the AutoLoader Schema Evolution Documentation and Scheme Inference and XML-specific AutoLoader Options

1 Comment