How to provide UPSERT condition in PySpark

Constantine
Contributor III

I have a table `demo_table_one` in which I want to upsert the following values

data = [
 
  (11111 , 'CA', '2020-01-26'),
  (11111 , 'CA', '2020-02-26'),
  (88888 , 'CA', '2020-06-10'),
  (88888 , 'CA', '2020-05-10'),
  (88888 , 'WA', '2020-07-10'),
  (88888 , 'WA', '2020-07-15'),
  (55555 , 'WA', '2020-05-15'),
  (55555 , 'CA', '2020-03-15'),
  ]
 
columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)

The logic is that for each attom_id & state_code we only want the latest sell_date

So the data in my table should be like

[
  (11111 , 'CA', '2020-02-26'),
  (88888 , 'CA', '2020-06-10'),
  (88888 , 'WA', '2020-07-15'),
  (55555 , 'CA', '2020-03-15')
]

and I have the following code to do it

from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one") 
 
#perform the UPSERT
 
(deltaTable.alias('orginal_table')
  .merge(df.alias('update_table'), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
  .whenNotMatchedInsertAll()
  .whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
  .execute())

But this inserts all the values in the table