cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

dropDuplicates

BradSheridan
Valued Contributor

Afternoon Community!! I've done some research today and found multiple, great approaches to accomplish what I'm trying to do, but having trouble understanding exactly which is best suited for my use case.

Suppose you're running Auto Loader on S3 and ultimately that data coming in will end up in a Delta table that already has billions of rows. What is the best way to find out if any records that are coming in already exist in the target table? Yes, there is a unique identifier (an MD5 hash of 12 columns) to compare on, but isn't it not really scalable to look through billions of records over and over again? thoughts?

thanks!

p.s. I got my Databricks swag today after spending my points and while I always love a great tshirt, the cooler bag is sooooo incredibly awesome!

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

basically you add a where condition in your merge query.

F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.

If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.

An example (in scala but python is almost the same):

spark.sql(s"""

     |MERGE INTO $targetTableName

     |USING $updatesTableName

     |ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id

     |WHEN MATCHED THEN

     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts

     |WHEN NOT MATCHED THEN

     |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)

 """

View solution in original post

5 REPLIES 5

AmanSehgal
Honored Contributor III

If you records are partitioned to narrow down your search, then can you try writing an upsert logic after autoloader code?

The upsert logic will insert, update or drop rows as per your conditions.

Kaniz
Community Manager
Community Manager

Hi @Brad Sheridan​, We haven’t heard from you since the last response from @Aman Sehgal​​​, and I was checking back to see if his suggestions helped you.

Or else, If you have any solution, please share it with the community, as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

thanks @Aman Sehgal​ ! I'll try this later this afternoon. do you happen to have a generic example or URL of the best practice way to implement in pyspark?

thanks

-werners-
Esteemed Contributor III

basically you add a where condition in your merge query.

F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.

If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.

An example (in scala but python is almost the same):

spark.sql(s"""

     |MERGE INTO $targetTableName

     |USING $updatesTableName

     |ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id

     |WHEN MATCHED THEN

     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts

     |WHEN NOT MATCHED THEN

     |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)

 """

Kaniz
Community Manager
Community Manager

@Brad Sheridan​, Incredibly awesome! I'm glad you liked the Community swag items. Thanks for all the love!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.