cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Read CSV files in Azure Databricks notebook, how to read data when columns in CSV files are in the w

Mado
Valued Contributor II

I have a task to revise CSV ingestion in Azure Databricks. The current implementation uses the below settings:

 

source_query = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .schema(defined_schema)
    .option("enforceSchema", "false")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .option("skipRows", 0)
    .option("header", True)
)

result = (
    source_query.load(input_path)
    .withColumn("original_filename", input_file_name())
    .writeStream.format("delta")
    .option("checkpointLocation", checkpoint_path)
    .trigger(once=True)
    .toTable(table_name)
)

I need to revise the CSV ingestion to consider flexibility when columns in the CSV file are not in the correct order.

Assume that the CSV file has the below columns:

  • CUSTOMER ID (integer)

  • CUSTOMER NAME (string)

  • ORDER NUMBER (integer)

The delta table has been defined with the below schema:

  • customder_id (integer)

  • customer_name (string)

  • order_number (integer)

Note that the column names in the CSV source file and target delta table are not exactly the same. The "defined_schema" parameter in the above code is the schema of the delta table.

Now I assume that I received a malformed CSV file with the wrong column order. For example, the column order is like below in the CSV file:

  • CUSTOMER ID (integer)

  • ORDER NUMBER (integer)

  • CUSTOMER NAME (string)

It means that data of "ORDER NUMBER" will be ingested into the "customer_name" column in the delta table.

I am looking for a solution to avoid writing data into the wrong column in the delta table.

One idea can be using saveAsTable to write data as it uses the column names to find the correct column positions. But, since the schema is defined in the above code, the column names in the DataFrame in the notebook are not exactly the column names in the CSV file. In other words, information about column names in the CSV file is lost. Also, column names in the CSV file are different from the target table. So, this solution cannot be used.

Is there any idea how to solve this issue?

If I set enforceSchema to True, does it fail ingestion when the column order is wrong in the CSV file?

Note: Note that sometimes data-type of the delta table is different from the CSV source file. I want to have this option to define schema too. For example, I may want to define schema of delta table as below (all columns are string):

  • customder_id (string)

  • customer_name (string)

  • order_number (string)

 

2 REPLIES 2

Mado
Valued Contributor II

@Kaniz_Fatma 

Thanks for your help. 

I receive different CSV files with different number of columns, and column names for different target tables. I mean, it is not only a CSV with specific column names and data types. I am looking for a solution that covers all tables that are being populated by CSV files.  

Mado
Valued Contributor II

 Also, I am looking for a solution that works with both correct files and malformed files using PySpark. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group