โ11-14-2022 02:02 PM
Afternoon Community!! I've done some research today and found multiple, great approaches to accomplish what I'm trying to do, but having trouble understanding exactly which is best suited for my use case.
Suppose you're running Auto Loader on S3 and ultimately that data coming in will end up in a Delta table that already has billions of rows. What is the best way to find out if any records that are coming in already exist in the target table? Yes, there is a unique identifier (an MD5 hash of 12 columns) to compare on, but isn't it not really scalable to look through billions of records over and over again? thoughts?
thanks!
p.s. I got my Databricks swag today after spending my points and while I always love a great tshirt, the cooler bag is sooooo incredibly awesome!
โ11-16-2022 02:39 AM
basically you add a where condition in your merge query.
F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.
If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.
An example (in scala but python is almost the same):
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
"""
โ11-14-2022 07:53 PM
If you records are partitioned to narrow down your search, then can you try writing an upsert logic after autoloader code?
The upsert logic will insert, update or drop rows as per your conditions.
โ11-15-2022 12:10 AM
Hi @Brad Sheridanโ, We havenโt heard from you since the last response from @Aman Sehgalโโโ, and I was checking back to see if his suggestions helped you.
Or else, If you have any solution, please share it with the community, as it can be helpful to others.
Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.
โ11-15-2022 04:58 AM
thanks @Aman Sehgalโ ! I'll try this later this afternoon. do you happen to have a generic example or URL of the best practice way to implement in pyspark?
thanks
โ11-16-2022 02:39 AM
basically you add a where condition in your merge query.
F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.
If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.
An example (in scala but python is almost the same):
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
"""
โ11-14-2022 10:39 PM
@Brad Sheridanโ, Incredibly awesome! I'm glad you liked the Community swag items. Thanks for all the love!