cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to provide UPSERT condition in PySpark

Constantine
Contributor III

I have a table `demo_table_one` in which I want to upsert the following values

data = [
 
  (11111 , 'CA', '2020-01-26'),
  (11111 , 'CA', '2020-02-26'),
  (88888 , 'CA', '2020-06-10'),
  (88888 , 'CA', '2020-05-10'),
  (88888 , 'WA', '2020-07-10'),
  (88888 , 'WA', '2020-07-15'),
  (55555 , 'WA', '2020-05-15'),
  (55555 , 'CA', '2020-03-15'),
  ]
 
columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)

The logic is that for each attom_id & state_code we only want the latest sell_date

So the data in my table should be like

[
  (11111 , 'CA', '2020-02-26'),
  (88888 , 'CA', '2020-06-10'),
  (88888 , 'WA', '2020-07-15'),
  (55555 , 'CA', '2020-03-15')
]

and I have the following code to do it

from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one") 
 
#perform the UPSERT
 
(deltaTable.alias('orginal_table')
  .merge(df.alias('update_table'), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
  .whenNotMatchedInsertAll()
  .whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
  .execute())

But this inserts all the values in the table

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

@John Constantineโ€‹ , According to the docs, whenMatched can have an optional condition.

So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?

View solution in original post

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

It will not have data in the destination during the first insert, so that it will execute .whenNotMatchedInsertAll() for every record. Also, when two new records arrive at once (with the same id and state) in the next upserts, it will insert both. For sure, what you need is to aggregate data before inserting ('attom_id,' 'state_code',MAX( 'sell_date').

Can't I do something like this in PySpark

deltaTable.as("orginal_table")
    .merge(df.as("update_table"), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
    .whenMatched("orginal_table.sell_date < update_table.sell_date")
    .updateAll()
    .whenNotMatched()
    .insertAll()
    .execute()

-werners-
Esteemed Contributor III

@John Constantineโ€‹ , According to the docs, whenMatched can have an optional condition.

So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?

Hubert-Dudek
Esteemed Contributor III

@John Constantine,โ€‹ can you additionally share what data is in demo_table_one? as we have only df (alias update_table) in that example

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group