Hi @marcuskw, Based on the provided information and the given code snippet, it seems that the condition in the whenNotMatchedBySourceUpdate
The clause does not isolate the specific partition in the Delta table.
This can lead to a ConcurrentAppendException
when running the merge operation in parallel, even though the table is set up with partitioning as a constraint.
To avoid this issue, you must make the separation explicit in the operation condition. In the provided code snippet, you can modify the state in the whenNotMatchedBySourceUpdate
clause to include the partition column, similar to how it is done in the whenMatchedUpdate
and whenNotMatchedInsert
clauses.
Here's an updated version of the code snippet:
python
(
deltaTable.alias('t')
.merge(df.alias('c'), f" t.partition= '{partition}' AND t.id= c.id")
.whenMatchedUpdate(set =
{
"t.id": "c.id",
"t.name": "c.name",
"t.partition": "c.partition",
"t.flag": "c.flag"
}
)
.whenNotMatchedInsert(values =
{
"t.id": "c.id",
"t.name": "c.name",
"t.partition": "c.partition",
"t.flag": "c.flag"
}
)
.whenNotMatchedBySourceUpdate(condition = f"t.partition= '{partition}' AND t.id= c.id",
set ={
"t.flag": F.lit(True)
}
)
.execute()
)
By including the t.partition= '{partition}'
condition in the whenNotMatchedBySourceUpdate
clause, you ensure that only the specific partition is scanned and edited, reducing the chances of a ConcurrentAppendException
during parallel execution.