I have a notebook that writes a delta table with a statement similar to the following:
match = "current.country = updates.country and current.process_date = updates.process_date"
deltaTable = DeltaTable.forPath(spark, silver_path)
deltaTable.alias("current")\
.merge(
data.alias("updates"),
match) \
.whenMatchedUpdate(
set = update_set,
condition = condition) \
.whenNotMatchedInsert(values = values_set)\
.execute()
The multitask job has two tasks that are executed in parallel.
When executing the job the following error is displayed:
ConcurrentAppendException: Files were added to partition [country=Panamรก, process_date=2022-01-01 00:00:00] by a concurrent update. Please try the operation again.
In each task I send different countries (Panama, Ecuador) and the same date as a parameter, so when executing only the information corresponding to the country sent should be written. This delta table is partitioned by the country and process_date fields. Any ideas what I'm doing wrong? How should I specify the partition to be affected when using the "merge" statement?
I appreciate if you can clarify how I should work with the partitions in these cases, since this is new to me.
Update: I made an adjustment in the condition to specify the country and process date according to what is indicated here (ConcurrentAppendException). Now I get the following error message:
ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
I can't think what could cause the error. Keep investigating.