cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

ConcurrentAppendException

Phani1
Valued Contributor II

Hi Team,

We're dealing with a concurrency issue when we attempt to run multiple jobs at the same time, and we're still having the same problem even after using partition and liquid clustering features. Now we're making sure to have the right where condition for all updates to prevent concurrency problems.

Please review the options below and tell us if this is the correct way to solve the problem or if there is another way to fix it.

Option1:
Having tables properly partitioned and referred to in the WHERE clause, along with unique filter criteria for each concurrent calls, is crucial.
Suppose you run the code concurrently for different dates or countries. Since each job is working on an independent partition on the target Delta table,
you don’t expect any conflicts. However, the condition is not explicit enough and can scan the entire table and can conflict with concurrent operations updating any other partitions.
Instead, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()

Option 2:
Whenever that ConcurrentAppendException occurs, make sure to incorporate application-specific retry logic into the code.

retry=5
while (retry > 0):
try:
Update statement on delta table
break
except Exception as e:
retry = retry -1
delay = 20 #random.randrange(0,20)
time.sleep(delay)
print(str(retry) + " Failed , added delay " + str(delay))
else
raise "updatefailed"

1 REPLY 1

Kannathasan
New Contributor III

Option 3 Bit complicated process but works well: 

Considering you will do Update, Insert and Delete operation on the same table at the same time by using multiple job.

1. Create a Dummy table with the target table schema with additional column called Operation_flag and Insert the rows which are needs to Updated or appended or deleted in the dummy table while inserting the rows please update the Operation_flag column with Insert, Update or Delete.
2. Filter the data based on Operation_flag in the Dummy table and do Update, Insert and Delete one by one in the target table once everything is done clear dummy table.

    For example :

         

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group