I have a task to revise CSV ingestion in Azure Databricks. The current implementation uses the below settings:
source_query = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.schema(defined_schema)
.option("enforceSchema", "false")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("skipRows", 0)
.option("header", True)
)
result = (
source_query.load(input_path)
.withColumn("original_filename", input_file_name())
.writeStream.format("delta")
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.toTable(table_name)
)
I need to revise the CSV ingestion to consider flexibility when columns in the CSV file are not in the correct order.
Assume that the CSV file has the below columns:
CUSTOMER ID (integer)
CUSTOMER NAME (string)
ORDER NUMBER (integer)
The delta table has been defined with the below schema:
customder_id (integer)
customer_name (string)
order_number (integer)
Note that the column names in the CSV source file and target delta table are not exactly the same. The "defined_schema" parameter in the above code is the schema of the delta table.
Now I assume that I received a malformed CSV file with the wrong column order. For example, the column order is like below in the CSV file:
CUSTOMER ID (integer)
ORDER NUMBER (integer)
CUSTOMER NAME (string)
It means that data of "ORDER NUMBER" will be ingested into the "customer_name" column in the delta table.
I am looking for a solution to avoid writing data into the wrong column in the delta table.
One idea can be using saveAsTable to write data as it uses the column names to find the correct column positions. But, since the schema is defined in the above code, the column names in the DataFrame in the notebook are not exactly the column names in the CSV file. In other words, information about column names in the CSV file is lost. Also, column names in the CSV file are different from the target table. So, this solution cannot be used.
Is there any idea how to solve this issue?
If I set enforceSchema to True, does it fail ingestion when the column order is wrong in the CSV file?
Note: Note that sometimes data-type of the delta table is different from the CSV source file. I want to have this option to define schema too. For example, I may want to define schema of delta table as below (all columns are string):
customder_id (string)
customer_name (string)
order_number (string)