ConcurrentAppendException requires a good partitioning strategy, here my logic works without fault for
"whenMatchedUpdate" and "whenNotMatchedInsert" logic.
When using "whenNotMatchedBySourceUpdate" however it seems that the condition doesn't isolate the specific partition in the delta table.
So when merging a dataframe with that logic we meet a ConcurrentAppendException when running in parallel even though the table is set up with "partition" as a constraint.
(
deltaTable.alias('t')
.merge(df.alias('c'), f" t.partition= '{partition}' AND t.id= c.id")
.whenMatchedUpdate( set =
{
"t.id": "c.id"
,"t.name": "c.name"
,"t.partition": "c.partition"
,"t.flag": "c.flag"
}
)
.whenNotMatchedInsert( values =
{
"t.id": "c.id"
,"t.name": "c.name"
,"t.partition": "c.partition"
,"t.flag": "c.flag"
}
)
.whenNotMatchedBySourceUpdate( condition = f"t.partition= '{partition}'",
set ={
,"t.flag": F.lit(True)
}
)
.execute()
)
Have I misunderstood the merge syntax, possibly whenNotMatchedBySourceUpdate scans the whole table and ignores the condition?