cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark JDBC Write Fails for Record Not Present - PK error

jorperort
Contributor

Good afternoon everyone,

I’m writing this post to see if anyone has encountered this problem and if there is a way to resolve it or understand why it happens. I’m working in a Databricks Runtime 15.4 LTS environment, which includes Apache Spark 3.5.0 and Scala 2.12. The target database is SQL Server, and the table in question has a composite primary key that prevents inserting duplicates. Part of its definition is as follows: the primary key is composed of the columns Cod1, Cod2, and Cod3, and the table settings include options such as STATISTICS_NORECOMPUTE set to OFF, IGNORE_DUP_KEY set to OFF, and OPTIMIZE_FOR_SEQUENTIAL_KEY set to ON.

The process consists of two main parts. First, I have a DataFrame called today_df, which is generated from several Delta tables and undergoes various transformations. Second, I have the yesterday_df table, which is obtained from the SQL Server database to synchronize both sources. The goal is to compare the two sources to detect differences. To achieve this, I generate a hash of the values in each row, excluding the primary key columns, and use this to identify new records, deleted records, and records that need to be updated.

The problem occurs when trying to insert the new records into SQL Server. I use a standard Spark JDBC write with append mode, specifying the SQL Server driver, connection URL, user, password, target table, batch size, number of partitions, isolation level, and enabling statement rewriting and compression. Despite this, I receive a PrimaryKeyViolation error, indicating that inserts_df contains a record with a primary key that already exists in the database. However, upon reviewing the data in inserts_df, I cannot find any record with a combination of keys matching an existing row in SQL Server.

Py4JJavaError: An error occurred while calling oXXX.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage XXX.X failed 4 times, most recent failure: Lost task 8.X in stage XXX.X (TID XXX) (XXX.XXX.XX.XX executor 0): java.sql.BatchUpdateException: Violation of PRIMARY KEY constraint 'PK_TableName_Version'. Cannot insert duplicate key in object 'Schema.TableName_Version'. The duplicate key value is (Key1Value, Key2Value, Key3Value).
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:XXXX)

I found an article in the Databricks knowledge base describing a similar issue, which explains that this can occur when Spark retries writing a partition after a successful commit, potentially creating duplicates in the target. The recommended solution is to create a temporary table in the database and then perform a MERGE with the final table.

In my case, I would like to know what alternatives exist to resolve this problem, since the target table is very large, over 40 million records, and I want to avoid using overwrite mode or a temporary table for performance and space reasons.

I would also like to better understand why this behavior occurs, since I cannot locate the record that supposedly already exists in the database. Everything suggests that the conflict happens during one of the partitions writing from inserts_df, but the record does not appear in the DataFrame itself.

I would greatly appreciate it if anyone who has experienced this same error could share how they resolved it, or if there is any configuration or best practice to avoid such duplicates during the JDBC write process.

Thank you very much in advance for any guidance or experience you can share.

1 ACCEPTED SOLUTION

Accepted Solutions

@jorperort 

Creating and using a staging table followed by a MERGE operation in SQL Server is considered the best practice especially when working with distributed systems like Spark

the issue often arises not from duplicates in the DataFrame itself but due to transactional or concurrency states in SQL Server combined with Spark retries. This causes inserts of records that are logically excluded by DataFrame operations but attempted at the database level because multiple distributed Spark tasks retry inserts. These conflicts typically cannot be resolved by DataFrame de-duplication alone.

Thus, the staging table plus a database-level MERGE remains the most robust solution to guarantee consistency, avoid primary key violations, and handle concurrent write attempts reliably

Databricks documentation on JDBC writes recommends routing batch writes to a staging table for subsequent MERGE:
https://docs.databricks.com/aws/en/archive/connectors/jdbc.html#write-to-staging-table-and-use-merge

controlling parallelism and avoiding retries:
https://docs.databricks.com/aws/en/archive/connectors/jdbc.html#control-parallelism-for-jdbc-queries

Summary: Unless your environment guarantees zero-retry, perfect concurrency and data uniqueness across all distributed tasks, creating a staging table followed by a controlled MERGE operation is effectively mandatory

View solution in original post

5 REPLIES 5

ManojkMohan
Honored Contributor

@jorperort  When writing to SQL Server tables with composite primary keys from Databricks using JDBC, unique constraint violations are often caused by Spark’s distributed retry logic  https://docs.databricks.com/aws/en/archive/connectors/jdbc

Solutions
Write to Staging Table and Use MERGE:
The recommended way is to always route batch writes to a temporary or staging table in SQL Server, then execute a database-level MERGE (upsert)


Tune Write Parallelism:
Adjust numPartitions, batchsize, and manage transaction isolation through JDBC to minimize retry issues. See official options and guidance on parallelism

https://docs.databricks.com/aws/en/archive/connectors/jdbc#control-parallelism-for-jdbc-queries

Validate DataFrame for Duplicates:
Always invoke .dropDuplicates([PK columns]) on the DataFrame before write.
https://docs.databricks.com/aws/en/archive/connectors/jdbc

SQL Server’s “IGNORE_DUP_KEY” option can sometimes help, but since yours is OFF, conflicts are not ignored.Databricks guidance on JDBC driver
https://docs.databricks.com/aws/en/ingestion/lakeflow-connect/sql-server-source-setup

 

Thank you very much for your response @ManojkMohan. By configuring the different options to avoid the retry problem, it may be possible to prevent the error from occurring. Additionally, by adding a dropDuplicates, would it be possible to avoid creating the staging table, or is it mandatory to create it in the SQL Server database

 

I still have doubts, because this is not a matter of duplicates in the DataFrame I’m trying to insert. The issue is that the DataFrame does not contain the record that the database or the transaction, when attempting the insert, is flagging.

 

For example, I print yesterday_df and the record exists in the database; I print today_df and the record also appears; but when I print inserts_df, that record is not present. Nevertheless, the transaction tries to insert it, even though the DataFrame I’m writing with the right join does not contain it. This leaves me a bit puzzled.

 

Ultimately, my question is: is it mandatory to create the staging table, or could proper configuration to avoid retries and other options I’m currently exploring allow us to avoid creating this table and then performing the MERGE?

@jorperort 

Creating and using a staging table followed by a MERGE operation in SQL Server is considered the best practice especially when working with distributed systems like Spark

the issue often arises not from duplicates in the DataFrame itself but due to transactional or concurrency states in SQL Server combined with Spark retries. This causes inserts of records that are logically excluded by DataFrame operations but attempted at the database level because multiple distributed Spark tasks retry inserts. These conflicts typically cannot be resolved by DataFrame de-duplication alone.

Thus, the staging table plus a database-level MERGE remains the most robust solution to guarantee consistency, avoid primary key violations, and handle concurrent write attempts reliably

Databricks documentation on JDBC writes recommends routing batch writes to a staging table for subsequent MERGE:
https://docs.databricks.com/aws/en/archive/connectors/jdbc.html#write-to-staging-table-and-use-merge

controlling parallelism and avoiding retries:
https://docs.databricks.com/aws/en/archive/connectors/jdbc.html#control-parallelism-for-jdbc-queries

Summary: Unless your environment guarantees zero-retry, perfect concurrency and data uniqueness across all distributed tasks, creating a staging table followed by a controlled MERGE operation is effectively mandatory

Hi @ManojkMohan @

 

Thanks for your note about Spark’s distributed retries causing PK violations. I’ll go with the staging table approach, but I have a quick question: does this issue happen only with composite primary keys (like my three-column PK), or could it also occur with a single-column PK?

 

If it’s only for composite keys, I might be able to merge my three columns into a single PK and avoid the staging table. Does that sound feasible?

 

Thanks!

@jorperort 

Merging your composite PK columns into a single column primary key would not inherently eliminate the concurrency or retry conflicts causing duplicates if multiple distributed Spark partitions are retrying the same record inserts independently. The underlying problem is that multiple distributed tasks may insert logically duplicate rows due to retries
 
Using a staging table followed by a controlled MERGE operation is still the most robust and recommended approach to:
 
Guarantee consistent writes without PK violations
 
Handle concurrent write attempts reliably
 
Avoid issues caused by retries from distributed Spark tasks