04-13-2022 11:07 AM
I have a table `demo_table_one` in which I want to upsert the following values
data = [
(11111 , 'CA', '2020-01-26'),
(11111 , 'CA', '2020-02-26'),
(88888 , 'CA', '2020-06-10'),
(88888 , 'CA', '2020-05-10'),
(88888 , 'WA', '2020-07-10'),
(88888 , 'WA', '2020-07-15'),
(55555 , 'WA', '2020-05-15'),
(55555 , 'CA', '2020-03-15'),
]
columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)
The logic is that for each attom_id & state_code we only want the latest sell_date
So the data in my table should be like
[
(11111 , 'CA', '2020-02-26'),
(88888 , 'CA', '2020-06-10'),
(88888 , 'WA', '2020-07-15'),
(55555 , 'CA', '2020-03-15')
]
and I have the following code to do it
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one")
#perform the UPSERT
(deltaTable.alias('orginal_table')
.merge(df.alias('update_table'), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
.whenNotMatchedInsertAll()
.whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
.execute())
But this inserts all the values in the table
04-14-2022 07:32 AM
@John Constantine , According to the docs, whenMatched can have an optional condition.
So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?
04-13-2022 11:52 AM
It will not have data in the destination during the first insert, so that it will execute .whenNotMatchedInsertAll() for every record. Also, when two new records arrive at once (with the same id and state) in the next upserts, it will insert both. For sure, what you need is to aggregate data before inserting ('attom_id,' 'state_code',MAX( 'sell_date').
04-13-2022 11:57 AM
Can't I do something like this in PySpark
deltaTable.as("orginal_table")
.merge(df.as("update_table"), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
.whenMatched("orginal_table.sell_date < update_table.sell_date")
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
04-14-2022 07:32 AM
@John Constantine , According to the docs, whenMatched can have an optional condition.
So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?
04-15-2022 04:12 AM
@John Constantine, can you additionally share what data is in demo_table_one? as we have only df (alias update_table) in that example
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group