cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

dropDuplicates

BradSheridan
Valued Contributor

Afternoon Community!! I've done some research today and found multiple, great approaches to accomplish what I'm trying to do, but having trouble understanding exactly which is best suited for my use case.

Suppose you're running Auto Loader on S3 and ultimately that data coming in will end up in a Delta table that already has billions of rows. What is the best way to find out if any records that are coming in already exist in the target table? Yes, there is a unique identifier (an MD5 hash of 12 columns) to compare on, but isn't it not really scalable to look through billions of records over and over again? thoughts?

thanks!

p.s. I got my Databricks swag today after spending my points and while I always love a great tshirt, the cooler bag is sooooo incredibly awesome!

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

basically you add a where condition in your merge query.

F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.

If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.

An example (in scala but python is almost the same):

spark.sql(s"""

     |MERGE INTO $targetTableName

     |USING $updatesTableName

     |ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id

     |WHEN MATCHED THEN

     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts

     |WHEN NOT MATCHED THEN

     |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)

 """

View solution in original post

3 REPLIES 3

AmanSehgal
Honored Contributor III

If you records are partitioned to narrow down your search, then can you try writing an upsert logic after autoloader code?

The upsert logic will insert, update or drop rows as per your conditions.

thanks @Aman Sehgal​ ! I'll try this later this afternoon. do you happen to have a generic example or URL of the best practice way to implement in pyspark?

thanks

-werners-
Esteemed Contributor III

basically you add a where condition in your merge query.

F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.

If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.

An example (in scala but python is almost the same):

spark.sql(s"""

     |MERGE INTO $targetTableName

     |USING $updatesTableName

     |ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id

     |WHEN MATCHED THEN

     |  UPDATE SET $targetTableName.ts = $updatesTableName.ts

     |WHEN NOT MATCHED THEN

     |  INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)

 """

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group