01-03-2024 01:36 PM
I have a task to revise CSV ingestion in Azure Databricks. The current implementation uses the below settings:
source_query = ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv") .schema(defined_schema) .option("enforceSchema", "false") .option("cloudFiles.schemaLocation", checkpoint_path) .option("skipRows", 0) .option("header", True) ) result = ( source_query.load(input_path) .withColumn("original_filename", input_file_name()) .writeStream.format("delta") .option("checkpointLocation", checkpoint_path) .trigger(once=True) .toTable(table_name) )
I need to revise the CSV ingestion to consider flexibility when columns in the CSV file are not in the correct order.
Assume that the CSV file has the below columns:
CUSTOMER ID (integer)
CUSTOMER NAME (string)
ORDER NUMBER (integer)
The delta table has been defined with the below schema:
customder_id (integer)
customer_name (string)
order_number (integer)
Note that the column names in the CSV source file and target delta table are not exactly the same. The "defined_schema" parameter in the above code is the schema of the delta table.
Now I assume that I received a malformed CSV file with the wrong column order. For example, the column order is like below in the CSV file:
CUSTOMER ID (integer)
ORDER NUMBER (integer)
CUSTOMER NAME (string)
It means that data of "ORDER NUMBER" will be ingested into the "customer_name" column in the delta table.
I am looking for a solution to avoid writing data into the wrong column in the delta table.
One idea can be using saveAsTable to write data as it uses the column names to find the correct column positions. But, since the schema is defined in the above code, the column names in the DataFrame in the notebook are not exactly the column names in the CSV file. In other words, information about column names in the CSV file is lost. Also, column names in the CSV file are different from the target table. So, this solution cannot be used.
Is there any idea how to solve this issue?
If I set enforceSchema to True, does it fail ingestion when the column order is wrong in the CSV file?
Note: Note that sometimes data-type of the delta table is different from the CSV source file. I want to have this option to define schema too. For example, I may want to define schema of delta table as below (all columns are string):
customder_id (string)
customer_name (string)
order_number (string)
01-04-2024 12:08 AM
Hi @Mado, To handle the issue of column order in CSV files, you could consider the following approach:
df = ( spark.read.format("csv") .option("header", "true") .load(input_path) )
1. Rename the columns to match the Delta table schema: You can rename the DataFrame columns to match the target Delta table schema. This step is necessary because the column names in the CSV file and the Delta table might not be the same.
df = df.withColumnRenamed("CUSTOMER ID", "customer_id")\ .withColumnRenamed("CUSTOMER NAME", "customer_name")\ .withColumnRenamed("ORDER NUMBER", "order_number")
2. Cast the columns to the desired data type: If the Delta table's data types differ from the CSV source file, you can cast the DataFrame columns to the desired data types.
from pyspark.sql.functions import col df = df.withColumn("customer_id", col("customer_id").cast("string"))\ .withColumn("customer_name", col("customer_name").cast("string"))\ .withColumn("order_number", col("order_number").cast("string"))
3. Write the DataFrame to the Delta table: Finally, you can write the DataFrame to the Delta table. The data will be written to the correct columns because the DataFrame column names now match the Delta table schema.
df.write.format("delta").mode("append").save(delta_table_path)
Regarding your question about the enforce schema option, Spark will enforce the user-defined schema when reading the CSV file if it's set to True. If the CSV file schema doesn’t match the user-defined schema, an exception will be thrown.
01-04-2024 12:08 AM
Hi @Mado, To handle the issue of column order in CSV files, you could consider the following approach:
df = ( spark.read.format("csv") .option("header", "true") .load(input_path) )
1. Rename the columns to match the Delta table schema: You can rename the DataFrame columns to match the target Delta table schema. This step is necessary because the column names in the CSV file and the Delta table might not be the same.
df = df.withColumnRenamed("CUSTOMER ID", "customer_id")\ .withColumnRenamed("CUSTOMER NAME", "customer_name")\ .withColumnRenamed("ORDER NUMBER", "order_number")
2. Cast the columns to the desired data type: If the Delta table's data types differ from the CSV source file, you can cast the DataFrame columns to the desired data types.
from pyspark.sql.functions import col df = df.withColumn("customer_id", col("customer_id").cast("string"))\ .withColumn("customer_name", col("customer_name").cast("string"))\ .withColumn("order_number", col("order_number").cast("string"))
3. Write the DataFrame to the Delta table: Finally, you can write the DataFrame to the Delta table. The data will be written to the correct columns because the DataFrame column names now match the Delta table schema.
df.write.format("delta").mode("append").save(delta_table_path)
Regarding your question about the enforce schema option, Spark will enforce the user-defined schema when reading the CSV file if it's set to True. If the CSV file schema doesn’t match the user-defined schema, an exception will be thrown.
01-04-2024 02:03 PM
Thanks for your help.
I receive different CSV files with different number of columns, and column names for different target tables. I mean, it is not only a CSV with specific column names and data types. I am looking for a solution that covers all tables that are being populated by CSV files.
01-04-2024 02:11 PM
Also, I am looking for a solution that works with both correct files and malformed files using PySpark.
01-18-2024 02:16 AM
Hi @Mado, If you have different CSV files for different tables in the same source directory, you can create di.... You can filter the filenames to consume by using the pathGlobFilter option on Auto Loader.
I hope this helps! Let me know if you have any other questions.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group