01-05-2024 08:10 AM
Hi
I'm using the COPY INTO command to insert new data (in form of CSVs) into an already existing table.
The SQL query takes care of the conversion of the fields to the target table schema (well, there isn't other way to do that), and schema update is not allowed. I would like the job to process all files matching the pattern, and to not fail in case some files have wrong schema (they are supposed to be investigated manually later).
Now, I thought that the ignoreCorruptFiles = true would do what I want, but whenever I test it with 3 test files (1 correct, 1 with a wrong line, 1 with a wrong formatted datetime), the job fails with error (NULL Constraint violated).
The error itself is clear, as the conversion of 2 rows will fail setting some input fields to NULL, which is not allowed for specific fields. However, what happen is that not even the file #1(the correct one) is being written.
Probably it all goes around the definition of corruption for CSVs (which has nothing to do with the schema definition), but can anyone clarify the meaning of ignoreCorruptFiles?
And is it possible to achieve what I want?
01-09-2024 05:33 AM
I actually found an option that could solve the newline issue I mentioned in my previous post:
setting spark.sql.csv.parser.columnPruning.enabled to false with
spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", False)
will consider malformed rows also partial rows (i.e, with a wrong number of separators), meaning that missing fields are NOT automatically filled with nulls (triggering schema errors only in case nullability is not enforced).
The default = True has been set from spark 2.4 (Spark SQL Upgrading Guide - Spark 2.4.0 Documentation (apache.org)) , but I can't really find any detailed explanation of the other consequences.
01-05-2024 09:13 AM
Hi @N_M, The ignoreCorruptFiles option in SQL is used to ignore corrupt files while reading data from files.
When set to true, the SQL jobs will continue to run when encountering corrupted files and the conten.... However, the definition of a “corrupt file” in this context typically refers to issues like unreadable or missing data within the file, not schema mismatches.
For handling CSVs with varying schemas, you might consider a few strategies:
Schema Mapping: Maintain a mapping configuration for each customer’s schema. This configuration can map/transform the customer schema to a consistent internal schema.
Error Handling: Implement robust error checking to ensure the files don’t come in malformed.
Log Analysis: If you have the ignoreCorruptFiles option set to true, Spark will log corrupted files .... You could potentially use these logs to identify and manually investigate files with schema issues.
Data Import Tools: Depending on your SQL server, you can use built-in tools to import and convert CS.... These tools often provide options for handling schema mismatches.
01-08-2024 01:26 AM
hi @Kaniz_Fatma thanks for the reply. I'm aware of the possibility to perform checks and error handling to handle such situation. However, this is not optimal:
This issue automatically implies that a CSV can **never** be considered corrupted, as by definition is always readable. I would suggest to throw a warning in case "ignoreCorruptFiles" is enabled with CSVs, at least.
An alternative approach could be to deal the malformed rows with columnNameOfCorruptRecord . I didn't test it with COPY INTO, but as an approach I consider it very intrusive, as it introduces a new element in the schema and a full new flow to deal with them, whereas a manual or custom fix-after-error approach (again, given that we expect them to be very rare) would be more easily maintainable.
TL;DR a sort of ignoreMalformedFiles option with a meaning log info would save the situation in case COPY INTO is used with CSVs. This would allow the query to run over multiple files, and to skip schema unexpected files, which can be dealt and fix later.
Is there something similar implemented or planned?
01-09-2024 12:31 AM
Hi @N_M, In Apache Spark™, there is an option to handle malformed records when reading CSV files. You can use the mode option with the value DROPMALFORMED. This option will skip the lines with an incorrect number of delimiters or that don’t match the schem....
01-09-2024 01:09 AM
Hi @Kaniz_Fatma
thank you spending time on this :).
The DROPMALFORMED option will silently ignore and drop malformed lines, which is not really an applicable option in a pipeline, at least not in all the use cases I have worked in so far.
A potential solution could be to use the badrecordspath (Handle bad records and files | Databricks on AWS), which, again, process all the files and silently write out bad records in jsons.
I'm not sure it works with COPY INTO, but the point is that I want a *single* file to fully fail if a malformed line is found, and thus a flow to achieve this should
One could also use the additional column "rescuedDataColumn" (Read and write to CSV files | Databricks on AWS) (again, I have to check with COPY INTO), but it wouldn't solve everything because you cannot revert the wrong file when it happens.
You may ask "why do you want a full file to fail in case of a single error?", which is what is triggering so many issues and questions here. The answer is that in my case
All the issues above can be solved using algos that read the files more than once (e.g. file check and then writing), and/or adding many layers of complexity to the COPY INTO command... or with the implementation of a "ignoreMalformedFiles" with some meaningful and easy log. That's why I was looking for a solution like this.
01-09-2024 05:33 AM
I actually found an option that could solve the newline issue I mentioned in my previous post:
setting spark.sql.csv.parser.columnPruning.enabled to false with
spark.conf.set("spark.sql.csv.parser.columnPruning.enabled", False)
will consider malformed rows also partial rows (i.e, with a wrong number of separators), meaning that missing fields are NOT automatically filled with nulls (triggering schema errors only in case nullability is not enforced).
The default = True has been set from spark 2.4 (Spark SQL Upgrading Guide - Spark 2.4.0 Documentation (apache.org)) , but I can't really find any detailed explanation of the other consequences.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group