cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

ignoreCorruptFiles behavior with CSV and COPY INTO

N_M
Contributor

Hi

I'm using the COPY INTO command to insert new data (in form of CSVs) into an already existing table.

The SQL query takes care of the conversion of the fields to the target table schema (well, there isn't other way to do that), and schema update is not allowed. I would like the job to process all files matching the pattern, and to not fail in case some files have wrong schema (they are supposed to be investigated manually later).

Now, I thought that the ignoreCorruptFiles = true would do what I want, but whenever I test it with 3 test files (1 correct, 1 with a wrong line, 1 with a wrong formatted datetime), the job fails with error (NULL Constraint violated).

The error itself is clear, as the conversion of 2 rows will fail setting some input fields to NULL, which is not allowed for specific fields. However, what happen is that not even the file #1(the correct one) is being written.

Probably it all goes around the definition of corruption for CSVs (which has nothing to do with the schema definition), but can anyone clarify the meaning of ignoreCorruptFiles?
And is it possible to achieve what I want?  

 

1 ACCEPTED SOLUTION

Accepted Solutions

N_M
Contributor

I actually found an option that could solve the newline issue I mentioned in my previous post:

setting spark.sql.csv.parser.columnPruning.enabled to false with

spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", False)

will consider malformed rows also partial rows (i.e, with a wrong number of separators), meaning that missing fields are NOT automatically filled with nulls (triggering schema errors only in case nullability is not enforced).

The default = True has been set from spark 2.4 (Spark SQL Upgrading Guide - Spark 2.4.0 Documentation (apache.org)) , but I can't really find any detailed explanation of the other consequences.

 

View solution in original post

3 REPLIES 3

N_M
Contributor

hi @Retired_mod  thanks for the reply. I'm aware of the possibility to perform checks and error handling to handle such situation. However, this is not optimal:

  • Perform checks on top of a file and then writing it means reading it 2 times. So a flow with this kind of approach is 2 times slower and more expensive. Assuming that errors are "rare", a fix-after-error approach is more preferrable and easier to handle. 
  • As we saw, COPY INTO will fail the full run in case 1 row in 1 file is malformed. Spark standard behavior is to fill up with nulls missing values/fields/lines etc, so a file that underwent some random truncation or contamination (or even a file truncated due to a IO error) will be read smoothly, and only a schema error is thrown, which would make a full run fail with very little log.

This issue automatically implies that a CSV can **never** be considered corrupted, as by definition is always readable. I would suggest to throw a warning in case "ignoreCorruptFiles" is enabled with CSVs, at least.

An alternative approach could be to deal the malformed rows with columnNameOfCorruptRecord . I didn't test it with COPY INTO, but as an approach I consider it very intrusive, as it introduces a new element in the schema and a full new flow to deal with them, whereas a manual or custom fix-after-error approach (again, given that we expect them to be very rare) would be more easily maintainable.

TL;DR a sort of ignoreMalformedFiles option with a meaning log info would save the situation in case COPY INTO is used with CSVs. This would allow the query to run over multiple files, and to skip schema unexpected files, which can be dealt and fix later.
Is there something similar implemented or planned?

 

N_M
Contributor

Hi @Retired_mod 

thank you spending time on this :).

The DROPMALFORMED option will silently ignore and drop malformed lines, which is not really an applicable option in a pipeline, at least not in all the use cases I have worked in so far.

A potential solution could be to use the badrecordspath (Handle bad records and files | Databricks on AWS), which, again, process all the files and silently write out bad records in jsons.
I'm not sure it works with COPY INTO, but the point is that I want a *single* file to fully fail if a malformed line is found, and thus a flow to achieve this should

  • check of the existence of the bad records files (written using a timestamp as name...).
  • get the filename of the offended line(s)
  • revert the writing of that single file (which is not possible with simple commands)

One could also use the additional column "rescuedDataColumn" (Read and write to CSV files | Databricks on AWS) (again, I have to check with COPY INTO), but it wouldn't solve everything because you cannot revert the wrong file when it happens.

You may ask "why do you want a full file to fail in case of a single error?", which is what is triggering so many issues and questions here. The answer is that in my case

  • wrong files are rare and errors should be investigated (and reported). The safest thing is to considered a faulted file as not reliable
  • an error like a newline in a field will likely trigger 1 error line and not 2. The first portion of the line is "usually" considered correct (with the rest of the line filled with Nones), and loaded into the final table, whereas the second part is considered faulty. It's not possible to connect the two, and one should ignore the first part as well.

All the issues above can be solved using algos that read the files more than once (e.g. file check and then writing), and/or adding many layers of complexity to the COPY INTO command... or with the implementation of a "ignoreMalformedFiles" with some meaningful and easy log. That's why I was looking for a solution like this.

 

N_M
Contributor

I actually found an option that could solve the newline issue I mentioned in my previous post:

setting spark.sql.csv.parser.columnPruning.enabled to false with

spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", False)

will consider malformed rows also partial rows (i.e, with a wrong number of separators), meaning that missing fields are NOT automatically filled with nulls (triggering schema errors only in case nullability is not enforced).

The default = True has been set from spark 2.4 (Spark SQL Upgrading Guide - Spark 2.4.0 Documentation (apache.org)) , but I can't really find any detailed explanation of the other consequences.

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group