cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Load CSV files with slightly different schemas

MRTN
New Contributor III

I have a set of CSV files generated by a system, where the schema has evolved over the years. Some columns have been added, and at least one column has been renamed in newer files. Is there any way to elegantly load these files into a dataframe?

I have tried spark.read.csv() using different options. My next thought would be to load the individual files using pandas, possibly using applyInPandas.

Any thoughts or ideas?

4 REPLIES 4

karthik_p
Esteemed Contributor

@Morten Stakkeland​ if i am not wrong, you want to handle schema changes from source to target. can you please check schema evolution in Databricks

Ajay-Pandey
Esteemed Contributor III

Hi @Morten Stakkeland​ ,

Please refer below blog that might help you-

Configure schema inference and evolution in Auto Loader | Databricks on AWS

MRTN
New Contributor III

Thanks for the input. I previously experimented with the Auto Loader, but was stopped by the fact that the headers in my csv files contains spaces and illegal characters. Hence the error

AnalysisException: 
Found invalid character(s) among " ,;{}()\n\t=" in the column names of your
schema. 
Please enable column mapping by setting table property 'delta.columnMapping.mode' to 'name'.
For more details, refer to https://docs.microsoft.com/azure/databricks/delta/delta-column-mapping
Or you can use alias to rename it.

Passing an option to set the column mapping to name did not resolve it

    .writeStream
    .format("delta")
    .option("checkpointLocation",checkpoint_location)
    .option("optimizeWrite","True")
    .option("schemaEvolutionMode","addNewColumns")
    .option("delta.columnMapping.mode", "name")
    .trigger(once=True)
    .toTable(table_name))

Creating the target table with mapping name prior to starting the stream did not help, as I got error messages related to schema mismatch.

I could not think of any way to rename all my 80+ columns "on the fly".

MRTN
New Contributor III

For reference - for anybody struggling with the same issues. All online examples using auto loader are written as one block statement on the form:

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")
)

The solution was to split this into three as follows

df=(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>"))
 
for c in df.columns:
    df = df.withColumnRenamed(c, c.replace(" ", "_").replace("(","%28").replace(")","%29").replace("/","%2F"))
 
df.writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.