โ04-05-2024 04:38 AM
Team,
I get a ConcurrentAppendException: Files were added to the root of the table by a concurrent update when trying to update a table which executes via jobs with for each activity in ADF,
I tried with Databricks run time 14.x and set the delete vector property as well..No luck.. Any thoughts
โ04-05-2024 10:05 AM
ConcurrentAppendException occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads.
You can read more about it here: https://docs.databricks.com/en/optimizations/isolation-level.html#concurrentappendexception
โ04-07-2024 11:29 PM
i tried these option and still it fails. Same error .. only difference is it works for couple of updates and then fails, where earlier it fails in the first attempt itself.
I am not keen to implement a re-try mechanism
โ04-14-2024 06:44 PM
Any help please
โ04-16-2024 05:17 AM
Hey,
This issue happens whenever two or more jobs try to write to the same partition for a table.
This exception is often thrown during concurrent DELETE, UPDATE, or MERGE operations. While the concurrent operations may be physically updating different partition directories, one of them may read the same partition that the other one concurrently updates, thus causing a conflict. You can avoid this by making the separation explicit in the operation condition. Consider the following example :
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Suppose you run the above code concurrently for different dates or countries. Since each job is working on an independent partition on the target Delta table, you donโt expect any conflicts. However, the condition is not explicit enough and can scan the entire table and can conflict with concurrent operations updating any other partitions. Instead, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
This operation is now safe to run concurrently on different dates and countries.
So, all you need is either reconfigure the jobs to be executed in a sequence or find a way to eliminate the chance that multiple are working with the same partitions.
Deletion vectors are indeed a great feature. However, the concurrency control in enhanced in 14.x with row tracking. To enable it :
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableRowTracking' = true);
Hope it helps,
Best,
Monday
In case of such an issue, I would like to suggest apply retry and try except logic (you can use one of existing libraries) in both concurrent updates - it should help, and jobs won't report any error.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group