cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Concurrent Update to Delta - Throws error

Databricks_info
New Contributor II

Team,

I get a ConcurrentAppendException: Files were added to the root of the table by a concurrent update when trying to update a table which executes via jobs with for each activity in ADF,

I tried with Databricks run time 14.x and set the delete vector property as well..No luck.. Any thoughts

 

4 REPLIES 4

Lakshay
Esteemed Contributor
Esteemed Contributor

ConcurrentAppendException occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads. 

You can read more about it here: https://docs.databricks.com/en/optimizations/isolation-level.html#concurrentappendexception

i tried these option and still it fails. Same error .. only difference is it works for couple of updates and then fails, where earlier it fails in the first attempt itself.

I am  not keen to implement a re-try mechanism

Any help please 

artsheiko
Valued Contributor III
Valued Contributor III

Hey,

This issue happens whenever two or more jobs try to write to the same partition for a table.

This exception is often thrown during concurrent DELETE, UPDATE, or MERGE operations. While the concurrent operations may be physically updating different partition directories, one of them may read the same partition that the other one concurrently updates, thus causing a conflict. You can avoid this by making the separation explicit in the operation condition. Consider the following example :

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Suppose you run the above code concurrently for different dates or countries. Since each job is working on an independent partition on the target Delta table, you donโ€™t expect any conflicts. However, the condition is not explicit enough and can scan the entire table and can conflict with concurrent operations updating any other partitions. Instead, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

This operation is now safe to run concurrently on different dates and countries.

So, all you need is either reconfigure the jobs to be executed in a sequence or find a way to eliminate the chance that multiple are working with the same partitions.

Deletion vectors are indeed a great feature. However, the concurrency control in enhanced in 14.x with row tracking. To enable it : 

ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableRowTracking' = true);

 Hope it helps,

Best,

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!