Hi Team,
We're dealing with a concurrency issue when we attempt to run multiple jobs at the same time, and we're still having the same problem even after using partition and liquid clustering features. Now we're making sure to have the right where condition for all updates to prevent concurrency problems.
Please review the options below and tell us if this is the correct way to solve the problem or if there is another way to fix it.
Option1:
Having tables properly partitioned and referred to in the WHERE clause, along with unique filter criteria for each concurrent calls, is crucial.
Suppose you run the code concurrently for different dates or countries. Since each job is working on an independent partition on the target Delta table,
you don’t expect any conflicts. However, the condition is not explicit enough and can scan the entire table and can conflict with concurrent operations updating any other partitions.
Instead, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Option 2:
Whenever that ConcurrentAppendException occurs, make sure to incorporate application-specific retry logic into the code.
retry=5
while (retry > 0):
try:
Update statement on delta table
break
except Exception as e:
retry = retry -1
delay = 20 #random.randrange(0,20)
time.sleep(delay)
print(str(retry) + " Failed , added delay " + str(delay))
else
raise "updatefailed"