cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Load CSV files with slightly different schemas

MRTN
New Contributor III

I have a set of CSV files generated by a system, where the schema has evolved over the years. Some columns have been added, and at least one column has been renamed in newer files. Is there any way to elegantly load these files into a dataframe?

I have tried spark.read.csv() using different options. My next thought would be to load the individual files using pandas, possibly using applyInPandas.

Any thoughts or ideas?

4 REPLIES 4

karthik_p
Esteemed Contributor

@Morten Stakkeland​ if i am not wrong, you want to handle schema changes from source to target. can you please check schema evolution in Databricks

Ajay-Pandey
Esteemed Contributor III

Hi @Morten Stakkeland​ ,

Please refer below blog that might help you-

Configure schema inference and evolution in Auto Loader | Databricks on AWS

Ajay Kumar Pandey

MRTN
New Contributor III

Thanks for the input. I previously experimented with the Auto Loader, but was stopped by the fact that the headers in my csv files contains spaces and illegal characters. Hence the error

AnalysisException: 
Found invalid character(s) among " ,;{}()\n\t=" in the column names of your
schema. 
Please enable column mapping by setting table property 'delta.columnMapping.mode' to 'name'.
For more details, refer to https://docs.microsoft.com/azure/databricks/delta/delta-column-mapping
Or you can use alias to rename it.

Passing an option to set the column mapping to name did not resolve it

    .writeStream
    .format("delta")
    .option("checkpointLocation",checkpoint_location)
    .option("optimizeWrite","True")
    .option("schemaEvolutionMode","addNewColumns")
    .option("delta.columnMapping.mode", "name")
    .trigger(once=True)
    .toTable(table_name))

Creating the target table with mapping name prior to starting the stream did not help, as I got error messages related to schema mismatch.

I could not think of any way to rename all my 80+ columns "on the fly".

MRTN
New Contributor III

For reference - for anybody struggling with the same issues. All online examples using auto loader are written as one block statement on the form:

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")
)

The solution was to split this into three as follows

df=(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>"))
 
for c in df.columns:
    df = df.withColumnRenamed(c, c.replace(" ", "_").replace("(","%28").replace(")","%29").replace("/","%2F"))
 
df.writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group