cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
cancel
Showing results for 
Search instead for 
Did you mean: 

Read CSV files in Azure Databricks notebook, how to read data when columns in CSV files are in the w

Mado
Valued Contributor II

I have a task to revise CSV ingestion in Azure Databricks. The current implementation uses the below settings:

 

source_query = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .schema(defined_schema)
    .option("enforceSchema", "false")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .option("skipRows", 0)
    .option("header", True)
)

result = (
    source_query.load(input_path)
    .withColumn("original_filename", input_file_name())
    .writeStream.format("delta")
    .option("checkpointLocation", checkpoint_path)
    .trigger(once=True)
    .toTable(table_name)
)

I need to revise the CSV ingestion to consider flexibility when columns in the CSV file are not in the correct order.

Assume that the CSV file has the below columns:

  • CUSTOMER ID (integer)

  • CUSTOMER NAME (string)

  • ORDER NUMBER (integer)

The delta table has been defined with the below schema:

  • customder_id (integer)

  • customer_name (string)

  • order_number (integer)

Note that the column names in the CSV source file and target delta table are not exactly the same. The "defined_schema" parameter in the above code is the schema of the delta table.

Now I assume that I received a malformed CSV file with the wrong column order. For example, the column order is like below in the CSV file:

  • CUSTOMER ID (integer)

  • ORDER NUMBER (integer)

  • CUSTOMER NAME (string)

It means that data of "ORDER NUMBER" will be ingested into the "customer_name" column in the delta table.

I am looking for a solution to avoid writing data into the wrong column in the delta table.

One idea can be using saveAsTable to write data as it uses the column names to find the correct column positions. But, since the schema is defined in the above code, the column names in the DataFrame in the notebook are not exactly the column names in the CSV file. In other words, information about column names in the CSV file is lost. Also, column names in the CSV file are different from the target table. So, this solution cannot be used.

Is there any idea how to solve this issue?

If I set enforceSchema to True, does it fail ingestion when the column order is wrong in the CSV file?

Note: Note that sometimes data-type of the delta table is different from the CSV source file. I want to have this option to define schema too. For example, I may want to define schema of delta table as below (all columns are string):

  • customder_id (string)

  • customer_name (string)

  • order_number (string)

 

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @Mado, To handle the issue of column order in CSV files, you could consider the following approach:

 

  1. Read the CSV file without enforcing a schema: This will allow Spark to infer the schema directly from the CSV file, preserving the column names from the CSV file.

df = (    spark.read.format("csv")    .option("header", "true")    .load(input_path) )

      1. Rename the columns to match the Delta table schema: You can rename the DataFrame columns to match the target Delta table schema. This step is necessary because the column names in the CSV file and the Delta table might not be the same.

df = df.withColumnRenamed("CUSTOMER ID", "customer_id")\       .withColumnRenamed("CUSTOMER NAME", "customer_name")\       .withColumnRenamed("ORDER NUMBER", "order_number")

      2. Cast the columns to the desired data type: If the Delta table's data types differ from the CSV source file, you can cast the DataFrame columns to the desired data types.

from pyspark.sql.functions import col df = df.withColumn("customer_id", col("customer_id").cast("string"))\ .withColumn("customer_name", col("customer_name").cast("string"))\ .withColumn("order_number", col("order_number").cast("string"))

     3. Write the DataFrame to the Delta table: Finally, you can write the DataFrame to the Delta table. The data will be written to the correct columns because the DataFrame column names now match the Delta table schema.

df.write.format("delta").mode("append").save(delta_table_path)

Regarding your question about the enforce schema option, Spark will enforce the user-defined schema when reading the CSV file if it's set to True. If the CSV file schema doesn’t match the user-defined schema, an exception will be thrown. 

View solution in original post

4 REPLIES 4

Kaniz
Community Manager
Community Manager

Hi @Mado, To handle the issue of column order in CSV files, you could consider the following approach:

 

  1. Read the CSV file without enforcing a schema: This will allow Spark to infer the schema directly from the CSV file, preserving the column names from the CSV file.

df = (    spark.read.format("csv")    .option("header", "true")    .load(input_path) )

      1. Rename the columns to match the Delta table schema: You can rename the DataFrame columns to match the target Delta table schema. This step is necessary because the column names in the CSV file and the Delta table might not be the same.

df = df.withColumnRenamed("CUSTOMER ID", "customer_id")\       .withColumnRenamed("CUSTOMER NAME", "customer_name")\       .withColumnRenamed("ORDER NUMBER", "order_number")

      2. Cast the columns to the desired data type: If the Delta table's data types differ from the CSV source file, you can cast the DataFrame columns to the desired data types.

from pyspark.sql.functions import col df = df.withColumn("customer_id", col("customer_id").cast("string"))\ .withColumn("customer_name", col("customer_name").cast("string"))\ .withColumn("order_number", col("order_number").cast("string"))

     3. Write the DataFrame to the Delta table: Finally, you can write the DataFrame to the Delta table. The data will be written to the correct columns because the DataFrame column names now match the Delta table schema.

df.write.format("delta").mode("append").save(delta_table_path)

Regarding your question about the enforce schema option, Spark will enforce the user-defined schema when reading the CSV file if it's set to True. If the CSV file schema doesn’t match the user-defined schema, an exception will be thrown. 

Mado
Valued Contributor II

@Kaniz 

Thanks for your help. 

I receive different CSV files with different number of columns, and column names for different target tables. I mean, it is not only a CSV with specific column names and data types. I am looking for a solution that covers all tables that are being populated by CSV files.  

Mado
Valued Contributor II

 Also, I am looking for a solution that works with both correct files and malformed files using PySpark. 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.