cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Running into delta.exceptions.ConcurrentAppendException even after setting up S3 Multi-Cluster Writes environment via S3 Dynamo DB LogStore

KiranKondamadug
New Contributor II

My use-case is to process a dataset worth 100s of partitions in concurrency. The data is partitioned, and they are disjointed. I was facing ConcurrentAppendException due to S3 not supporting the โ€œput-if-absentโ€ consistency guarantee. From Delta Lake 1.2, with the help of S3DynamoDBLogStore API, all writers across multiple clusters and/or Spark drivers can concurrently write to Delta Lake S3 while ensuring that only one writer succeeds with each transaction. My Delta Lake version is 2.1. I created a Dynamo DB table with auto-scaling enabled for number of reads/writes and passed the configuration to the delta job. Please find the configuration below (omitted some spark related config).

    spark = SparkSession \

        .builder \

        .appName("Delta Operations") \

        .config("spark.driver.memory", args["spark_driver_memory"]) \

        .config("spark.executor.memory", args["spark_executor_memory"]) \

        .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", args["log_table_name"]) \

        .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", args["log_region"]) \

        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

โ€‹

Please find the actual logic below:

delta_table.alias("old").merge(

            input_df.alias("new"),

            f"old.{primary_key} = new.{primary_key}") \

            .whenMatchedDelete(condition=col(f"old.{primary_key}").isin(deletes_df)) \

            .whenMatchedUpdateAll() \

            .whenNotMatchedInsertAll() \

            .execute()

delta_table is the destination table in delta lake.

input_df is a combined data frame of all the inserts, and updates.

deletes_df is the dataframe that has just the deletes.

โ€‹

I am still running into delta.exceptions.ConcurrentAppendException irrespective of these settings. Am I doing something wrong?

1 REPLY 1

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi, You can refer to https://docs.databricks.com/optimizations/isolation-level.html#conflict-exceptions and recheck if everything is alright.

Please let us know if this helps, also please tag @Debayanโ€‹ with your next response which will notify me, Thank you!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.