cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Concurrent Writes to the same DELTA TABLE

jeremy98
Contributor III

Hi Community,

My team and I have written some workflows that write to the same table. One of my workflows performs a MERGE operation on the table, while another workflow performs an append. However, these operations can occur simultaneously, leading to conflicts.

As a result, one of my workflows fails with the following error:

ConcurrentAppendException: [DELTA_CONCURRENT_APPEND]
ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.

How can I resolve this issue?

1 ACCEPTED SOLUTION

Accepted Solutions

Sidhant07
Databricks Employee
Databricks Employee

To resolve the issue of concurrent write conflicts, specifically the `ConcurrentAppendException: [DELTA_CONCURRENT_APPEND]`, you can consider the following strategies:

1. **Isolation Levels**:
- **WriteSerializable**: This is the default isolation level in Delta Lake, which allows certain pairs of concurrent writes to proceed without conflicts. However, if you need stricter isolation, you can set the isolation level to `Serializable` to prevent any reordering of transactions.
- To change the isolation level, use the following command:
```sql
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
```

2. **Row-Level Concurrency**:
- Ensure that row-level concurrency is enabled, which reduces conflicts by detecting changes at the row level. This is generally available on Databricks Runtime 14.2 and above.
- For tables with deletion vectors enabled and without partitioning, row-level concurrency is supported by default. If your table has partitions, it may not support row-level concurrency but can still avoid conflicts between `OPTIMIZE` and other write operations when deletion vectors are enabled.

3. **Retry Policy**:
- Implement a retry policy to handle instances of `ConcurrentAppendException`. This involves retrying the operation after a short delay, which can help in resolving transient conflicts.

4. **Avoiding Conflicts with Partitioning**:
- Partition your table by columns that are commonly used in the conditions of your operations. This can make the sets of files disjoint and reduce the likelihood of conflicts.
- For example, if your operations frequently filter by date, partitioning the table by date can help avoid conflicts.

5. **Explicit Conditions in Operations**:
- Ensure that the conditions in your `MERGE`, `UPDATE`, or `DELETE` operations are explicit enough to avoid scanning the entire table. This can help in reducing conflicts with concurrent operations.
- For example, if your table is partitioned by date and country, include these columns in the merge condition:
```python
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '<date>' AND t.country = '<country>'"
).whenMatched().updateAll().whenNotMatched().insertAll().execute()
```

By following these strategies, you can mitigate the conflicts arising from concurrent write operations and reduce the occurrence of `ConcurrentAppendException`. 

View solution in original post

4 REPLIES 4

saurabh18cs
Valued Contributor III

is your delta partitioned? if yes then you cannot make use of row level concurrency but with default writeserializable isolation level , append and merge can work together. have you changed your isloation level from writeserializable to serializable? how are you doing your merges?

Hi,
Thanks for your answer. The Delta Table is not partitioned right now. Could be partitioned by the type of topic. And if I'll do the partitioning one workflow will write only to one partition, and another workflow only to another partition.

Maybe this could solved directly the problem?

The table has only those properties:

 

USING delta
TBLPROPERTIES (
  'delta.enableChangeDataFeed' = 'true',
  'delta.enableDeletionVectors' = 'true',
  'delta.feature.appendOnly' = 'supported',
  'delta.feature.changeDataFeed' = 'supported',
  'delta.feature.deletionVectors' = 'supported',
  'delta.feature.invariants' = 'supported',
  'delta.minReaderVersion' = '3',
  'delta.minWriterVersion' = '7',
  'description' = 'Inference result for a given UP')

 

One workflow is working with UPSERT operations, instead the other workflow only with APPEND mode directly to that table.

saurabh18cs
Valued Contributor III

are you using Databricks Runtime 14.2 and above, if not can you switch and try/

Sidhant07
Databricks Employee
Databricks Employee

To resolve the issue of concurrent write conflicts, specifically the `ConcurrentAppendException: [DELTA_CONCURRENT_APPEND]`, you can consider the following strategies:

1. **Isolation Levels**:
- **WriteSerializable**: This is the default isolation level in Delta Lake, which allows certain pairs of concurrent writes to proceed without conflicts. However, if you need stricter isolation, you can set the isolation level to `Serializable` to prevent any reordering of transactions.
- To change the isolation level, use the following command:
```sql
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
```

2. **Row-Level Concurrency**:
- Ensure that row-level concurrency is enabled, which reduces conflicts by detecting changes at the row level. This is generally available on Databricks Runtime 14.2 and above.
- For tables with deletion vectors enabled and without partitioning, row-level concurrency is supported by default. If your table has partitions, it may not support row-level concurrency but can still avoid conflicts between `OPTIMIZE` and other write operations when deletion vectors are enabled.

3. **Retry Policy**:
- Implement a retry policy to handle instances of `ConcurrentAppendException`. This involves retrying the operation after a short delay, which can help in resolving transient conflicts.

4. **Avoiding Conflicts with Partitioning**:
- Partition your table by columns that are commonly used in the conditions of your operations. This can make the sets of files disjoint and reduce the likelihood of conflicts.
- For example, if your operations frequently filter by date, partitioning the table by date can help avoid conflicts.

5. **Explicit Conditions in Operations**:
- Ensure that the conditions in your `MERGE`, `UPDATE`, or `DELETE` operations are explicit enough to avoid scanning the entire table. This can help in reducing conflicts with concurrent operations.
- For example, if your table is partitioned by date and country, include these columns in the merge condition:
```python
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '<date>' AND t.country = '<country>'"
).whenMatched().updateAll().whenNotMatched().insertAll().execute()
```

By following these strategies, you can mitigate the conflicts arising from concurrent write operations and reduce the occurrence of `ConcurrentAppendException`. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group