This exception often occurs when concurrent operations may be physically updating different partition directories, but one of them may read the same partition that the other one concurrently updates, thus causing a conflict.
Even though you're working with different rows of data, the condition in your MERGE command might not be explicit enough and can scan the entire table, which can conflict with concurrent operations updating any other partitions.
To avoid this, you can make the separation explicit in the operation condition. For example, if your 'deltaTable' is partitioned by date and country, you can add specific date and country to the merge condition. Here's an example:
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
This operation is now safe to run concurrently on different dates and countries. Please replace <date>
and <country>
with your specific values.