cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to parallelly merge data into partitions of databricks delta table using PySpark/Spark streaming?

bobbysidhartha
New Contributor

I have a PySpark streaming pipeline which reads data from a Kafka topic, data undergoes thru various transformations and finally gets merged into a databricks delta table. In the beginning we were loading data into the delta table by using the merge function as given below.

This incoming dataframe inc_df had data for all partitions.

merge into main_db.main_delta_table main_dt USING inc_df df ON 
    main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND 
    main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND 
    main_.rule_read_start=df.rule_read_start AND 
    main_.company = df.company 
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT *

We were executing the above query on table level.

I have given a very basic diagram of the process in the image below. 

WbOeJBut my delta table is partitioned on continent and year. For example, this is how my partitioned delta table looks like.

6MYWV 

So I tried implementing the merge on partition level and tried to run merge activity on multiple partitions parallelly. i.e. I have created seperate pipelines with the filters in queries on partition levels. Image can be seen below.

merge into main_db.main_delta_table main_dt USING inc_df df ON 
    main_dt.continent in ('AFRICA') AND main_dt.year in (‘202301’) AND 
    main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND 
    main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND 
    main_.rule_read_start=df.rule_read_start AND 
    main_.company = df.company 
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT *

But I am seeing an error with concurrency.

com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [continent=AFRICA, year=2021] by a concurrent update. Please try the operation again.

I understand that the error is telling me that it cannot update files concurrently. But I have huge volume of data in production and I don't want to perform merge on table level where there are almost 1billion records without proper filters.

Trial2: As an alternate approach,

  1. I saved my incremental dataframe in an S3 bucket (like a staging dir) and end my streaming pipeline there.
  2. Then I have a seperate PySpark job that reads data from that S3 staging dir and performs merge into my main delta table, once again on partition level (I have specified partitions in those jobs as filters)

But I am facing the same exception/error there as well.

Trial3:

I also made another attempt in a different approach as mentioned in the link and `ConcurrentAppendException`  section from that page.

base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
    source=final_incremental_df.alias("df"), 
    condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='Africa'")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()

and

base_delta = DeltaTable.forPath(spark,'s3://PATH_OF_BASE_DELTA_TABLE')
base_delta.alias("main_dt").merge(
    source=final_incremental_df.alias("df"), 
    condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company, continent='ASIA'")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()

I ran the above merge operations in two separate pipelines. But I am still facing the same issue.

Could anyone let me know how can I design and optimise my streaming pipeline to merge data into delta table on partition level by having multiple jobs parallelly (jobs running on individual partitions)

1 REPLY 1

Anonymous
Not applicable

@bobbysidhartha​ :

When merging data into a partitioned Delta table in parallel, it is important to ensure that each job only accesses and modifies the files in its own partition to avoid concurrency issues. One way to achieve this is to use partition-level locking to prevent concurrent updates to the same partition.

Here is an example of how you can modify your PySpark streaming pipeline to merge data into a partitioned Delta table in parallel:

  1. Create a separate Spark job for each partition you want to update. Each job should have a filter on the partition key to ensure that it only processes the data for that partition.
  2. Use the DeltaTable.forPath() method to create a Delta table object for each partition. You can then use this object to perform the merge operation.
  3. Use the partition-level locking feature of Delta tables to ensure that only one job can access a partition at a time. To do this, you can use the .option("partitionBy", "partition_key").option("replaceWhere", "partition_key = 'partition_value'") method when creating the Delta table object for each partition. This will acquire a lock on the partition and prevent other jobs from modifying it while the merge operation is in progress.
  4. After the merge operation is complete, release the lock on the partition by using the .option("replaceWhere", "1=1") method.

Here is an example code snippet to help you get started:

# Get the partition keys from your Delta table
partition_keys = ["continent", "year"]
 
# Iterate through each partition and create a Spark job to update it
for continent in ["AFRICA", "ASIA", "EUROPE", ...]:
  for year in ["2021", "2022", "2023", ...]:
    # Create a filter for the current partition
    partition_filter = f"continent='{continent}' AND year='{year}'"
    
    # Create a Delta table object for the current partition
    delta_table = DeltaTable.forPath(spark, f"/path/to/delta_table/{continent}/{year}")
    
    # Acquire a lock on the current partition
    delta_table \
      .toDF() \
      .where(partition_filter) \
      .write \
      .option("partitionBy", ",".join(partition_keys)) \
      .option("replaceWhere", partition_filter) \
      .format("delta") \
      .mode("append") \
      .save(delta_table._data_path)
      
    # Perform the merge operation on the current partition
    delta_table \
      .alias("main_dt") \
      .merge(
        source=inc_df.alias("df"), 
        condition="main_dt.continent=df.continent AND main_dt.str_id=df.str_id AND main_.rule_date=df.rule_date AND main_.rule_id=df.rule_id AND main_.rule_read_start=df.rule_read_start AND main_.company = df.company") \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()
      
    # Release the lock on the current partition
    delta_table \
      .toDF() \
      .where(partition_filter) \
      .write \
      .option("partitionBy", ",".join(partition_keys)) \
      .option("replaceWhere", "1=1") \
      .format("delta") \
      .mode("append") \
      .save(delta_table._data_path)

Note that the above code is just an example and may need to be modified to fit your specific use case. Also, be sure to test this approach on a small subset of your data before running it on your entire dataset to ensure that it works as expected.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.