cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to provide UPSERT condition in PySpark

Constantine
Contributor III

I have a table `demo_table_one` in which I want to upsert the following values

data = [
 
  (11111 , 'CA', '2020-01-26'),
  (11111 , 'CA', '2020-02-26'),
  (88888 , 'CA', '2020-06-10'),
  (88888 , 'CA', '2020-05-10'),
  (88888 , 'WA', '2020-07-10'),
  (88888 , 'WA', '2020-07-15'),
  (55555 , 'WA', '2020-05-15'),
  (55555 , 'CA', '2020-03-15'),
  ]
 
columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)

The logic is that for each attom_id & state_code we only want the latest sell_date

So the data in my table should be like

[
  (11111 , 'CA', '2020-02-26'),
  (88888 , 'CA', '2020-06-10'),
  (88888 , 'WA', '2020-07-15'),
  (55555 , 'CA', '2020-03-15')
]

and I have the following code to do it

from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one") 
 
#perform the UPSERT
 
(deltaTable.alias('orginal_table')
  .merge(df.alias('update_table'), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
  .whenNotMatchedInsertAll()
  .whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
  .execute())

But this inserts all the values in the table

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

@John Constantine​ , According to the docs, whenMatched can have an optional condition.

So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?

View solution in original post

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

It will not have data in the destination during the first insert, so that it will execute .whenNotMatchedInsertAll() for every record. Also, when two new records arrive at once (with the same id and state) in the next upserts, it will insert both. For sure, what you need is to aggregate data before inserting ('attom_id,' 'state_code',MAX( 'sell_date').

Can't I do something like this in PySpark

deltaTable.as("orginal_table")
    .merge(df.as("update_table"), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
    .whenMatched("orginal_table.sell_date < update_table.sell_date")
    .updateAll()
    .whenNotMatched()
    .insertAll()
    .execute()

-werners-
Esteemed Contributor III

@John Constantine​ , According to the docs, whenMatched can have an optional condition.

So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?

Hubert-Dudek
Esteemed Contributor III

@John Constantine,​ can you additionally share what data is in demo_table_one? as we have only df (alias update_table) in that example

Kaniz
Community Manager
Community Manager

Hi @John Constantine​  , Just a friendly follow-up. Do you still need help, or do @Hubert Dudek (Customer)​ and @werners responses help you find the solution? Please let us know.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.