11-14-2022 02:02 PM
Afternoon Community!! I've done some research today and found multiple, great approaches to accomplish what I'm trying to do, but having trouble understanding exactly which is best suited for my use case.
Suppose you're running Auto Loader on S3 and ultimately that data coming in will end up in a Delta table that already has billions of rows. What is the best way to find out if any records that are coming in already exist in the target table? Yes, there is a unique identifier (an MD5 hash of 12 columns) to compare on, but isn't it not really scalable to look through billions of records over and over again? thoughts?
thanks!
p.s. I got my Databricks swag today after spending my points and while I always love a great tshirt, the cooler bag is sooooo incredibly awesome!
11-16-2022 02:39 AM
basically you add a where condition in your merge query.
F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.
If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.
An example (in scala but python is almost the same):
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
"""
11-14-2022 07:53 PM
If you records are partitioned to narrow down your search, then can you try writing an upsert logic after autoloader code?
The upsert logic will insert, update or drop rows as per your conditions.
11-15-2022 12:10 AM
Hi @Brad Sheridan, We haven’t heard from you since the last response from @Aman Sehgal, and I was checking back to see if his suggestions helped you.
Or else, If you have any solution, please share it with the community, as it can be helpful to others.
Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.
11-15-2022 04:58 AM
thanks @Aman Sehgal ! I'll try this later this afternoon. do you happen to have a generic example or URL of the best practice way to implement in pyspark?
thanks
11-16-2022 02:39 AM
basically you add a where condition in your merge query.
F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.
If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.
An example (in scala but python is almost the same):
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
"""
11-14-2022 10:39 PM
@Brad Sheridan, Incredibly awesome! I'm glad you liked the Community swag items. Thanks for all the love!
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.