<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Running into delta.exceptions.ConcurrentAppendException even after setting up S3 Multi-Cluster Writes environment via S3 Dynamo DB LogStore in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/running-into-delta-exceptions-concurrentappendexception-even/m-p/3792#M713</link>
    <description>&lt;P&gt;My use-case is to process a dataset worth 100s of partitions in concurrency. The data is partitioned, and they are disjointed. I was facing ConcurrentAppendException due to S3 not supporting the “put-if-absent” consistency guarantee. From Delta Lake 1.2, with the help of S3DynamoDBLogStore API, all writers across multiple clusters and/or Spark drivers can concurrently write to Delta Lake S3 while ensuring that only one writer succeeds with each transaction. My Delta Lake version is 2.1. I created a Dynamo DB table with auto-scaling enabled for number of reads/writes and passed the configuration to the delta job. &lt;B&gt;Please find the configuration below (omitted some spark related config).&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; spark = SparkSession \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .builder \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .appName("Delta Operations") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .config("spark.driver.memory", args["spark_driver_memory"]) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .config("spark.executor.memory", args["spark_executor_memory"]) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", args["log_table_name"]) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", args["log_region"]) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .getOrCreate()&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; spark.sparkContext.setLogLevel('WARN')&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;&lt;B&gt;Please find the actual logic below:&lt;/B&gt;&lt;/P&gt;&lt;P&gt; delta_table.alias("old").merge(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; input_df.alias("new"),&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; f"old.{primary_key} = new.{primary_key}") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .whenMatchedDelete(condition=col(f"old.{primary_key}").isin(deletes_df)) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .whenMatchedUpdateAll() \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .whenNotMatchedInsertAll() \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .execute()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;delta_table is the destination table in delta lake.&lt;/P&gt;&lt;P&gt;input_df is a combined data frame of all the inserts, and updates.&lt;/P&gt;&lt;P&gt;deletes_df is the dataframe that has just the deletes.&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;I am still running into &lt;B&gt;delta.exceptions.ConcurrentAppendException&lt;/B&gt; irrespective of these settings. Am I doing something wrong?&lt;/P&gt;</description>
    <pubDate>Wed, 31 May 2023 19:27:05 GMT</pubDate>
    <dc:creator>KiranKondamadug</dc:creator>
    <dc:date>2023-05-31T19:27:05Z</dc:date>
    <item>
      <title>Running into delta.exceptions.ConcurrentAppendException even after setting up S3 Multi-Cluster Writes environment via S3 Dynamo DB LogStore</title>
      <link>https://community.databricks.com/t5/data-engineering/running-into-delta-exceptions-concurrentappendexception-even/m-p/3792#M713</link>
      <description>&lt;P&gt;My use-case is to process a dataset worth 100s of partitions in concurrency. The data is partitioned, and they are disjointed. I was facing ConcurrentAppendException due to S3 not supporting the “put-if-absent” consistency guarantee. From Delta Lake 1.2, with the help of S3DynamoDBLogStore API, all writers across multiple clusters and/or Spark drivers can concurrently write to Delta Lake S3 while ensuring that only one writer succeeds with each transaction. My Delta Lake version is 2.1. I created a Dynamo DB table with auto-scaling enabled for number of reads/writes and passed the configuration to the delta job. &lt;B&gt;Please find the configuration below (omitted some spark related config).&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; spark = SparkSession \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .builder \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .appName("Delta Operations") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .config("spark.driver.memory", args["spark_driver_memory"]) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .config("spark.executor.memory", args["spark_executor_memory"]) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", args["log_table_name"]) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", args["log_region"]) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .getOrCreate()&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; spark.sparkContext.setLogLevel('WARN')&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;&lt;B&gt;Please find the actual logic below:&lt;/B&gt;&lt;/P&gt;&lt;P&gt; delta_table.alias("old").merge(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; input_df.alias("new"),&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; f"old.{primary_key} = new.{primary_key}") \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .whenMatchedDelete(condition=col(f"old.{primary_key}").isin(deletes_df)) \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .whenMatchedUpdateAll() \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .whenNotMatchedInsertAll() \&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .execute()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;delta_table is the destination table in delta lake.&lt;/P&gt;&lt;P&gt;input_df is a combined data frame of all the inserts, and updates.&lt;/P&gt;&lt;P&gt;deletes_df is the dataframe that has just the deletes.&lt;/P&gt;&lt;P&gt;​&lt;/P&gt;&lt;P&gt;I am still running into &lt;B&gt;delta.exceptions.ConcurrentAppendException&lt;/B&gt; irrespective of these settings. Am I doing something wrong?&lt;/P&gt;</description>
      <pubDate>Wed, 31 May 2023 19:27:05 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/running-into-delta-exceptions-concurrentappendexception-even/m-p/3792#M713</guid>
      <dc:creator>KiranKondamadug</dc:creator>
      <dc:date>2023-05-31T19:27:05Z</dc:date>
    </item>
    <item>
      <title>Re: Running into delta.exceptions.ConcurrentAppendException even after setting up S3 Multi-Cluster Writes environment via S3 Dynamo DB LogStore</title>
      <link>https://community.databricks.com/t5/data-engineering/running-into-delta-exceptions-concurrentappendexception-even/m-p/3793#M714</link>
      <description>&lt;P&gt;Hi, You can refer to &lt;A href="https://docs.databricks.com/optimizations/isolation-level.html#conflict-exceptions" alt="https://docs.databricks.com/optimizations/isolation-level.html#conflict-exceptions" target="_blank"&gt;https://docs.databricks.com/optimizations/isolation-level.html#conflict-exceptions&lt;/A&gt; and recheck if everything is alright. &lt;/P&gt;&lt;P&gt;Please let us know if this helps, also please tag&amp;nbsp;@Debayan​&amp;nbsp;with your next response which will notify me, Thank you!&lt;/P&gt;</description>
      <pubDate>Tue, 06 Jun 2023 07:36:31 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/running-into-delta-exceptions-concurrentappendexception-even/m-p/3793#M714</guid>
      <dc:creator>Debayan</dc:creator>
      <dc:date>2023-06-06T07:36:31Z</dc:date>
    </item>
  </channel>
</rss>

