cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Concurrent Update to Delta - Throws error

Databricks_info
New Contributor II

Team,

I get a ConcurrentAppendException: Files were added to the root of the table by a concurrent update when trying to update a table which executes via jobs with for each activity in ADF,

I tried with Databricks run time 14.x and set the delete vector property as well..No luck.. Any thoughts

 

5 REPLIES 5

Lakshay
Databricks Employee
Databricks Employee

ConcurrentAppendException occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads. 

You can read more about it here: https://docs.databricks.com/en/optimizations/isolation-level.html#concurrentappendexception

i tried these option and still it fails. Same error .. only difference is it works for couple of updates and then fails, where earlier it fails in the first attempt itself.

I am  not keen to implement a re-try mechanism

Any help please 

artsheiko
Databricks Employee
Databricks Employee

Hey,

This issue happens whenever two or more jobs try to write to the same partition for a table.

This exception is often thrown during concurrent DELETE, UPDATE, or MERGE operations. While the concurrent operations may be physically updating different partition directories, one of them may read the same partition that the other one concurrently updates, thus causing a conflict. You can avoid this by making the separation explicit in the operation condition. Consider the following example :

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Suppose you run the above code concurrently for different dates or countries. Since each job is working on an independent partition on the target Delta table, you don’t expect any conflicts. However, the condition is not explicit enough and can scan the entire table and can conflict with concurrent operations updating any other partitions. Instead, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

This operation is now safe to run concurrently on different dates and countries.

So, all you need is either reconfigure the jobs to be executed in a sequence or find a way to eliminate the chance that multiple are working with the same partitions.

Deletion vectors are indeed a great feature. However, the concurrency control in enhanced in 14.x with row tracking. To enable it : 

ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableRowTracking' = true);

 Hope it helps,

Best,

rpiotr
New Contributor II

In case of such an issue, I would like to suggest apply retry and try except logic (you can use one of existing libraries) in both concurrent updates - it should help, and jobs won't report any error.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now