Read CSV files in Azure Databricks notebook, how to read data when columns in CSV files are in the w
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-03-2024 01:36 PM
I have a task to revise CSV ingestion in Azure Databricks. The current implementation uses the below settings:
source_query = ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv") .schema(defined_schema) .option("enforceSchema", "false") .option("cloudFiles.schemaLocation", checkpoint_path) .option("skipRows", 0) .option("header", True) ) result = ( source_query.load(input_path) .withColumn("original_filename", input_file_name()) .writeStream.format("delta") .option("checkpointLocation", checkpoint_path) .trigger(once=True) .toTable(table_name) )
I need to revise the CSV ingestion to consider flexibility when columns in the CSV file are not in the correct order.
Assume that the CSV file has the below columns:
CUSTOMER ID (integer)
CUSTOMER NAME (string)
ORDER NUMBER (integer)
The delta table has been defined with the below schema:
customder_id (integer)
customer_name (string)
order_number (integer)
Note that the column names in the CSV source file and target delta table are not exactly the same. The "defined_schema" parameter in the above code is the schema of the delta table.
Now I assume that I received a malformed CSV file with the wrong column order. For example, the column order is like below in the CSV file:
CUSTOMER ID (integer)
ORDER NUMBER (integer)
CUSTOMER NAME (string)
It means that data of "ORDER NUMBER" will be ingested into the "customer_name" column in the delta table.
I am looking for a solution to avoid writing data into the wrong column in the delta table.
One idea can be using saveAsTable to write data as it uses the column names to find the correct column positions. But, since the schema is defined in the above code, the column names in the DataFrame in the notebook are not exactly the column names in the CSV file. In other words, information about column names in the CSV file is lost. Also, column names in the CSV file are different from the target table. So, this solution cannot be used.
Is there any idea how to solve this issue?
If I set enforceSchema to True, does it fail ingestion when the column order is wrong in the CSV file?
Note: Note that sometimes data-type of the delta table is different from the CSV source file. I want to have this option to define schema too. For example, I may want to define schema of delta table as below (all columns are string):
customder_id (string)
customer_name (string)
order_number (string)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-04-2024 02:03 PM
Thanks for your help.
I receive different CSV files with different number of columns, and column names for different target tables. I mean, it is not only a CSV with specific column names and data types. I am looking for a solution that covers all tables that are being populated by CSV files.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-04-2024 02:11 PM
Also, I am looking for a solution that works with both correct files and malformed files using PySpark.

