My use-case is to process a dataset worth 100s of partitions in concurrency. The data is partitioned, and they are disjointed. I was facing ConcurrentAppendException due to S3 not supporting the โput-if-absentโ consistency guarantee. From Delta Lake 1.2, with the help of S3DynamoDBLogStore API, all writers across multiple clusters and/or Spark drivers can concurrently write to Delta Lake S3 while ensuring that only one writer succeeds with each transaction. My Delta Lake version is 2.1. I created a Dynamo DB table with auto-scaling enabled for number of reads/writes and passed the configuration to the delta job. Please find the configuration below (omitted some spark related config).
spark = SparkSession \
.builder \
.appName("Delta Operations") \
.config("spark.driver.memory", args["spark_driver_memory"]) \
.config("spark.executor.memory", args["spark_executor_memory"]) \
.config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", args["log_table_name"]) \
.config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", args["log_region"]) \
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
โ
Please find the actual logic below:
delta_table.alias("old").merge(
input_df.alias("new"),
f"old.{primary_key} = new.{primary_key}") \
.whenMatchedDelete(condition=col(f"old.{primary_key}").isin(deletes_df)) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
delta_table is the destination table in delta lake.
input_df is a combined data frame of all the inserts, and updates.
deletes_df is the dataframe that has just the deletes.
โ
I am still running into delta.exceptions.ConcurrentAppendException irrespective of these settings. Am I doing something wrong?