<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How to provide UPSERT condition in PySpark in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22956#M15810</link>
    <description>&lt;P&gt;@John Constantine,​&amp;nbsp;can you additionally share what data is in demo_table_one? as we have only df (alias update_table) in that example&lt;/P&gt;</description>
    <pubDate>Fri, 15 Apr 2022 11:12:07 GMT</pubDate>
    <dc:creator>Hubert-Dudek</dc:creator>
    <dc:date>2022-04-15T11:12:07Z</dc:date>
    <item>
      <title>How to provide UPSERT condition in PySpark</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22952#M15806</link>
      <description>&lt;P&gt;I have a table `demo_table_one` in which I want to upsert the following values&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;data = [
&amp;nbsp;
  (11111 , 'CA', '2020-01-26'),
  (11111 , 'CA', '2020-02-26'),
  (88888 , 'CA', '2020-06-10'),
  (88888 , 'CA', '2020-05-10'),
  (88888 , 'WA', '2020-07-10'),
  (88888 , 'WA', '2020-07-15'),
  (55555 , 'WA', '2020-05-15'),
  (55555 , 'CA', '2020-03-15'),
  ]
&amp;nbsp;
columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The logic is that for each attom_id &amp;amp; state_code we only want the latest sell_date&lt;/P&gt;&lt;P&gt;So the data in my table should be like&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;[
  (11111 , 'CA', '2020-02-26'),
  (88888 , 'CA', '2020-06-10'),
  (88888 , 'WA', '2020-07-15'),
  (55555 , 'CA', '2020-03-15')
]&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;and I have the following code to do it&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one") 
&amp;nbsp;
#perform the UPSERT
&amp;nbsp;
(deltaTable.alias('orginal_table')
  .merge(df.alias('update_table'), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
  .whenNotMatchedInsertAll()
  .whenMatchedUpdateAll("orginal_table.sell_date &amp;lt; update_table.sell_date")
  .execute())&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;But this inserts all the values in the table&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 13 Apr 2022 18:07:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22952#M15806</guid>
      <dc:creator>Constantine</dc:creator>
      <dc:date>2022-04-13T18:07:52Z</dc:date>
    </item>
    <item>
      <title>Re: How to provide UPSERT condition in PySpark</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22953#M15807</link>
      <description>&lt;P&gt;It will not have data in the destination during the first insert, so that it will execute .whenNotMatchedInsertAll() for every record. Also, when two new records arrive at once (with the same id and state) in the next upserts, it will insert both. For sure, what you need is to aggregate data before inserting ('attom_id,' 'state_code',MAX( 'sell_date').&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 13 Apr 2022 18:52:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22953#M15807</guid>
      <dc:creator>Hubert-Dudek</dc:creator>
      <dc:date>2022-04-13T18:52:44Z</dc:date>
    </item>
    <item>
      <title>Re: How to provide UPSERT condition in PySpark</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22954#M15808</link>
      <description>&lt;P&gt;Can't I do something like this in PySpark&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;deltaTable.as("orginal_table")
    .merge(df.as("update_table"), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
    .whenMatched("orginal_table.sell_date &amp;lt; update_table.sell_date")
    .updateAll()
    .whenNotMatched()
    .insertAll()
    .execute()&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 13 Apr 2022 18:57:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22954#M15808</guid>
      <dc:creator>Constantine</dc:creator>
      <dc:date>2022-04-13T18:57:16Z</dc:date>
    </item>
    <item>
      <title>Re: How to provide UPSERT condition in PySpark</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22955#M15809</link>
      <description>&lt;P&gt;@John Constantine​&amp;nbsp;, &lt;A href="https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder" alt="https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder" target="_blank"&gt;According to the docs, whenMatched can have an optional condition.&lt;/A&gt;&lt;/P&gt;&lt;P&gt;So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?&lt;/P&gt;</description>
      <pubDate>Thu, 14 Apr 2022 14:32:00 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22955#M15809</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2022-04-14T14:32:00Z</dc:date>
    </item>
    <item>
      <title>Re: How to provide UPSERT condition in PySpark</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22956#M15810</link>
      <description>&lt;P&gt;@John Constantine,​&amp;nbsp;can you additionally share what data is in demo_table_one? as we have only df (alias update_table) in that example&lt;/P&gt;</description>
      <pubDate>Fri, 15 Apr 2022 11:12:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-provide-upsert-condition-in-pyspark/m-p/22956#M15810</guid>
      <dc:creator>Hubert-Dudek</dc:creator>
      <dc:date>2022-04-15T11:12:07Z</dc:date>
    </item>
  </channel>
</rss>

