โ08-04-2025 02:49 AM - edited โ08-04-2025 02:53 AM
The Problem:
A common data engineering challenge is reading a directory of CSV files where the schemas are inconsistent. For example, some files might have columns in a different order, or be missing certain columns altogether. The standard behavior of Spark often leads to silent data corruption (column shifting) in these cases.
The Reproducible Example:
Here is a simple, self-contained example you can run in any Databricks notebook. It creates two CSV files in DBFS:
file_A.csv: The "correct" file with 3 columns: ConversationID,UserID,AgentName.
file_B.csv: The "problem" file with only 2 columns, which are also reordered and missing UserID: AgentName,ConversationID.
# --- 1. Setup: Create our test CSV files in DBFS ---
# Define file paths
dbfs_path = "/tmp/community_discussion_data/"
file_path_A = f"{dbfs_path}file_A.csv"
file_path_B = f"{dbfs_path}file_B.csv"
# Clean up previous runs
dbutils.fs.rm(dbfs_path, recurse=True)
# File A: The "complete" file
header_A = "ConversationID,UserID,AgentName"
data_A = '"conv_a1",101,"Alice"'
dbutils.fs.put(file_path_A, f"{header_A}\n{data_A}", overwrite=True)
print(f"Created File A: {file_path_A}")
# File B: The "problem" file (missing UserID, columns reordered)
header_B = "AgentName,ConversationID"
data_B = '"Bob","conv_b1"'
dbutils.fs.put(file_path_B, f"{header_B}\n{data_B}", overwrite=True)
print(f"Created File B: {file_path_B}")First, let's read the directory without any special options. My expectation is that Spark will infer the schema from the first file (file_A.csv) and then positionally read the second file, causing a data shift.
# Reading without the mergeSchema option
df_without_merge = spark.read.csv(
dbfs_path,
header=True,
inferSchema=False
)
print("Result WITHOUT mergeSchema:")
df_without_merge.show()+--------------+-------+---------+
|ConversationID| UserID|AgentName|
+--------------+-------+---------+
| conv_a1| 101| Alice|
| Bob|conv_b1| null|
+--------------+-------+---------+Now, let's run the exact same read but add .option("mergeSchema", "true"). According to the official Apache Spark documentation, this option should only apply to Parquet and ORC files.
# Reading WITH the mergeSchema option
df_with_merge = spark.read.option("mergeSchema"="true").csv(
dbfs_path,
header=True
)
print("Result WITH mergeSchema:")
df_with_merge.show()+--------------+-------+---------+
|ConversationID| UserID|AgentName|
+--------------+-------+---------+
| conv_a1| 101| Alice|
| conv_b1| null| Bob|
+--------------+-------+---------+Analysis: This result is perfect! Spark correctly aligned the data by column name, placing "Bob" in AgentName and "conv_b1" in ConversationID, while correctly identifying that UserID was missing for that row and filling it with null.
This behavior is fantastic, but it leads to a few key questions:
Is this an officially supported, enhanced feature of the Databricks CSV reader? The result is clearly what we'd want, but it seems to contradict the public Apache Spark documentation (see DataFrameReader docs, which doesn't list mergeSchema as an option).
Is this the recommended best practice on Databricks for ingesting CSVs with inconsistent schemas? Should we be confident in using this in production over more complex patterns like manually grouping files and unioning them?
โ09-29-2025 10:05 AM
Hey @JaydeepKhatri here are some helpful points to consider:
Is this an officially supported, enhanced feature of the Databricks CSV reader?
Based on internal research, this appears to be an undocumented โfeatureโ of Spark running on Databricks. Anecdotally, people are using it without major issues, but since it isnโt officially documented, proceed with caution.
Is this the recommended best practice on Databricks for ingesting CSVs with inconsistent schemas?
A better approach is to use Auto Loader, which reads files from cloud storage and provides schema evolution options. Specifically, look into the schemaEvolutionMode option.
Documentation: Schema evolution with Auto Loader
Example code:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://path/to/files"))
Key distinctions between mergeSchema and Auto Loader schema evolution:
mergeSchema โ A Delta Lake option used during reads/writes to Delta tables.
Auto Loader schema evolution โ Configuration options (cloudFiles.schemaEvolutionMode) for handling new columns while streaming in CSV/JSON/Parquet.
๐ Direct answer: mergeSchema is not part of the Auto Loader API. Auto Loader has its own schema evolution features. However, if youโre ultimately writing the ingested data to Delta tables, you can use them together:
Auto Loader detects schema drift and evolves the DataFrame schema.
mergeSchema ensures the Delta table evolves when the data is written.
Hope this helps, Louis.
โ08-06-2025 03:42 AM
Very nice example and explanation helped me a lot, thanks !
โ09-29-2025 10:05 AM
Hey @JaydeepKhatri here are some helpful points to consider:
Is this an officially supported, enhanced feature of the Databricks CSV reader?
Based on internal research, this appears to be an undocumented โfeatureโ of Spark running on Databricks. Anecdotally, people are using it without major issues, but since it isnโt officially documented, proceed with caution.
Is this the recommended best practice on Databricks for ingesting CSVs with inconsistent schemas?
A better approach is to use Auto Loader, which reads files from cloud storage and provides schema evolution options. Specifically, look into the schemaEvolutionMode option.
Documentation: Schema evolution with Auto Loader
Example code:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://path/to/files"))
Key distinctions between mergeSchema and Auto Loader schema evolution:
mergeSchema โ A Delta Lake option used during reads/writes to Delta tables.
Auto Loader schema evolution โ Configuration options (cloudFiles.schemaEvolutionMode) for handling new columns while streaming in CSV/JSON/Parquet.
๐ Direct answer: mergeSchema is not part of the Auto Loader API. Auto Loader has its own schema evolution features. However, if youโre ultimately writing the ingested data to Delta tables, you can use them together:
Auto Loader detects schema drift and evolves the DataFrame schema.
mergeSchema ensures the Delta table evolves when the data is written.
Hope this helps, Louis.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now