How to parallelly merge data into partitions of databricks delta table using PySpark/Spark streaming?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-13-2023 04:26 AM
I have a PySpark streaming pipeline which reads data from a Kafka topic, data undergoes thru various transformations and finally gets merged into a databricks delta table. In the beginning we were loading data into the delta table by using the merge function as given below.
This incoming dataframe inc_df had data for all partitions.
merge into main_db.main_delta_table main_dt USING inc_df df ON
main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND
main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND
main_.rule_read_start=df.rule_read_start AND
main_.company = df.company
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
We were executing the above query on table level.
I have given a very basic diagram of the process in the image below.
But my delta table is partitioned on continent and year. For example, this is how my partitioned delta table looks like.
So I tried implementing the merge on partition level and tried to run merge activity on multiple partitions parallelly. i.e. I have created seperate pipelines with the filters in queries on partition levels. Image can be seen below.
merge into main_db.main_delta_table main_dt USING inc_df df ON
main_dt.continent in ('AFRICA') AND main_dt.year in (‘202301’) AND
main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND
main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND
main_.rule_read_start=df.rule_read_start AND
main_.company = df.company
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
But I am seeing an error with concurrency.
com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [continent=AFRICA, year=2021] by a concurrent update. Please try the operation again.
I understand that the error is telling me that it cannot update files concurrently. But I have huge volume of data in production and I don't want to perform merge on table level where there are almost 1billion records without proper filters.
Trial2: As an alternate approach,
- I saved my incremental dataframe in an S3 bucket (like a staging dir) and end my streaming pipeline there.
- Then I have a seperate PySpark job that reads data from that S3 staging dir and performs merge into my main delta table, once again on partition level (I have specified partitions in those jobs as filters)
But I am facing the same exception/error there as well.
Trial3:
I also made another attempt in a different approach as mentioned in the link and `ConcurrentAppendException` section from that page.
base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
source=final_incremental_df.alias("df"),
condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='Africa'")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
and
base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
source=final_incremental_df.alias("df"),
condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='ASIA'")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
I ran the above merge operations in two separate pipelines. But I am still facing the same issue.
Could anyone let me know how can I design and optimise my streaming pipeline to merge data into delta table on partition level by having multiple jobs parallelly (jobs running on individual partitions)
- Labels:
-
Data
-
Databricks Delta Table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-10-2023 07:41 AM
@bobbysidhartha :
When merging data into a partitioned Delta table in parallel, it is important to ensure that each job only accesses and modifies the files in its own partition to avoid concurrency issues. One way to achieve this is to use partition-level locking to prevent concurrent updates to the same partition.
Here is an example of how you can modify your PySpark streaming pipeline to merge data into a partitioned Delta table in parallel:
- Create a separate Spark job for each partition you want to update. Each job should have a filter on the partition key to ensure that it only processes the data for that partition.
- Use the DeltaTable.forPath() method to create a Delta table object for each partition. You can then use this object to perform the merge operation.
- Use the partition-level locking feature of Delta tables to ensure that only one job can access a partition at a time. To do this, you can use the .option("partitionBy", "partition_key").option("replaceWhere", "partition_key = 'partition_value'") method when creating the Delta table object for each partition. This will acquire a lock on the partition and prevent other jobs from modifying it while the merge operation is in progress.
- After the merge operation is complete, release the lock on the partition by using the .option("replaceWhere", "1=1") method.
Here is an example code snippet to help you get started:
# Get the partition keys from your Delta table
partition_keys = ["continent", "year"]
# Iterate through each partition and create a Spark job to update it
for continent in ["AFRICA", "ASIA", "EUROPE", ...]:
for year in ["2021", "2022", "2023", ...]:
# Create a filter for the current partition
partition_filter = f"continent='{continent}' AND year='{year}'"
# Create a Delta table object for the current partition
delta_table = DeltaTable.forPath(spark, f"/path/to/delta_table/{continent}/{year}")
# Acquire a lock on the current partition
delta_table \
.toDF() \
.where(partition_filter) \
.write \
.option("partitionBy", ",".join(partition_keys)) \
.option("replaceWhere", partition_filter) \
.format("delta") \
.mode("append") \
.save(delta_table._data_path)
# Perform the merge operation on the current partition
delta_table \
.alias("main_dt") \
.merge(
source=inc_df.alias("df"),
condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Release the lock on the current partition
delta_table \
.toDF() \
.where(partition_filter) \
.write \
.option("partitionBy", ",".join(partition_keys)) \
.option("replaceWhere", "1=1") \
.format("delta") \
.mode("append") \
.save(delta_table._data_path)
Note that the above code is just an example and may need to be modified to fit your specific use case. Also, be sure to test this approach on a small subset of your data before running it on your entire dataset to ensure that it works as expected.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-23-2024 04:53 AM - edited 12-23-2024 04:54 AM
This reply seems to have been AI-generated. I would not approve this one.

