Load CSV files with slightly different schemas
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-04-2023 12:22 PM
I have a set of CSV files generated by a system, where the schema has evolved over the years. Some columns have been added, and at least one column has been renamed in newer files. Is there any way to elegantly load these files into a dataframe?
I have tried spark.read.csv() using different options. My next thought would be to load the individual files using pandas, possibly using applyInPandas.
Any thoughts or ideas?
- Labels:
-
CSV
-
Schema evolution
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-04-2023 12:24 PM
@Morten Stakkeland if i am not wrong, you want to handle schema changes from source to target. can you please check schema evolution in Databricks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-04-2023 10:28 PM
Hi @Morten Stakkeland ,
Please refer below blog that might help you-
Configure schema inference and evolution in Auto Loader | Databricks on AWS
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-05-2023 12:39 AM
Thanks for the input. I previously experimented with the Auto Loader, but was stopped by the fact that the headers in my csv files contains spaces and illegal characters. Hence the error
AnalysisException:
Found invalid character(s) among " ,;{}()\n\t=" in the column names of your
schema.
Please enable column mapping by setting table property 'delta.columnMapping.mode' to 'name'.
For more details, refer to https://docs.microsoft.com/azure/databricks/delta/delta-column-mapping
Or you can use alias to rename it.
Passing an option to set the column mapping to name did not resolve it
.writeStream
.format("delta")
.option("checkpointLocation",checkpoint_location)
.option("optimizeWrite","True")
.option("schemaEvolutionMode","addNewColumns")
.option("delta.columnMapping.mode", "name")
.trigger(once=True)
.toTable(table_name))
Creating the target table with mapping name prior to starting the stream did not help, as I got error messages related to schema mismatch.
I could not think of any way to rename all my 80+ columns "on the fly".
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-12-2023 01:08 AM
For reference - for anybody struggling with the same issues. All online examples using auto loader are written as one block statement on the form:
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
.load("<path_to_source_data>")
.writeStream
.option("checkpointLocation", "<path_to_checkpoint>")
.start("<path_to_target")
)
The solution was to split this into three as follows
df=(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
.load("<path_to_source_data>"))
for c in df.columns:
df = df.withColumnRenamed(c, c.replace(" ", "_").replace("(","%28").replace(")","%29").replace("/","%2F"))
df.writeStream
.option("checkpointLocation", "<path_to_checkpoint>")
.start("<path_to_target")