cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Running into delta.exceptions.ConcurrentAppendException even after setting up S3 Multi-Cluster Writes environment via S3 Dynamo DB LogStore

KiranKondamadug
New Contributor II

My use-case is to process a dataset worth 100s of partitions in concurrency. The data is partitioned, and they are disjointed. I was facing ConcurrentAppendException due to S3 not supporting the โ€œput-if-absentโ€ consistency guarantee. From Delta Lake 1.2, with the help of S3DynamoDBLogStore API, all writers across multiple clusters and/or Spark drivers can concurrently write to Delta Lake S3 while ensuring that only one writer succeeds with each transaction. My Delta Lake version is 2.1. I created a Dynamo DB table with auto-scaling enabled for number of reads/writes and passed the configuration to the delta job. Please find the configuration below (omitted some spark related config).

    spark = SparkSession \

        .builder \

        .appName("Delta Operations") \

        .config("spark.driver.memory", args["spark_driver_memory"]) \

        .config("spark.executor.memory", args["spark_executor_memory"]) \

        .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", args["log_table_name"]) \

        .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", args["log_region"]) \

        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

โ€‹

Please find the actual logic below:

delta_table.alias("old").merge(

            input_df.alias("new"),

            f"old.{primary_key} = new.{primary_key}") \

            .whenMatchedDelete(condition=col(f"old.{primary_key}").isin(deletes_df)) \

            .whenMatchedUpdateAll() \

            .whenNotMatchedInsertAll() \

            .execute()

delta_table is the destination table in delta lake.

input_df is a combined data frame of all the inserts, and updates.

deletes_df is the dataframe that has just the deletes.

โ€‹

I am still running into delta.exceptions.ConcurrentAppendException irrespective of these settings. Am I doing something wrong?

1 REPLY 1

Debayan
Databricks Employee
Databricks Employee

Hi, You can refer to https://docs.databricks.com/optimizations/isolation-level.html#conflict-exceptions and recheck if everything is alright.

Please let us know if this helps, also please tag @Debayanโ€‹ with your next response which will notify me, Thank you!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group