โ04-13-2022 11:07 AM
I have a table `demo_table_one` in which I want to upsert the following values
data = [
(11111 , 'CA', '2020-01-26'),
(11111 , 'CA', '2020-02-26'),
(88888 , 'CA', '2020-06-10'),
(88888 , 'CA', '2020-05-10'),
(88888 , 'WA', '2020-07-10'),
(88888 , 'WA', '2020-07-15'),
(55555 , 'WA', '2020-05-15'),
(55555 , 'CA', '2020-03-15'),
]
columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)
The logic is that for each attom_id & state_code we only want the latest sell_date
So the data in my table should be like
[
(11111 , 'CA', '2020-02-26'),
(88888 , 'CA', '2020-06-10'),
(88888 , 'WA', '2020-07-15'),
(55555 , 'CA', '2020-03-15')
]
and I have the following code to do it
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one")
#perform the UPSERT
(deltaTable.alias('orginal_table')
.merge(df.alias('update_table'), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
.whenNotMatchedInsertAll()
.whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
.execute())
But this inserts all the values in the table
โ04-14-2022 07:32 AM
@John Constantineโ , According to the docs, whenMatched can have an optional condition.
So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?
โ04-13-2022 11:52 AM
It will not have data in the destination during the first insert, so that it will execute .whenNotMatchedInsertAll() for every record. Also, when two new records arrive at once (with the same id and state) in the next upserts, it will insert both. For sure, what you need is to aggregate data before inserting ('attom_id,' 'state_code',MAX( 'sell_date').
โ04-13-2022 11:57 AM
Can't I do something like this in PySpark
deltaTable.as("orginal_table")
.merge(df.as("update_table"), "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
.whenMatched("orginal_table.sell_date < update_table.sell_date")
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
โ04-14-2022 07:32 AM
@John Constantineโ , According to the docs, whenMatched can have an optional condition.
So I don't immediately see the issue here. Maybe the whenMatched condition is never true for some reason?
โ04-15-2022 04:12 AM
@John Constantine,โ can you additionally share what data is in demo_table_one? as we have only df (alias update_table) in that example
โ04-26-2022 03:40 AM
Hi @John Constantineโ , Just a friendly follow-up. Do you still need help, or do @Hubert Dudek (Customer)โ and @werners responses help you find the solution? Please let us know.
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.