11-14-2022 02:02 PM
Afternoon Community!! I've done some research today and found multiple, great approaches to accomplish what I'm trying to do, but having trouble understanding exactly which is best suited for my use case.
Suppose you're running Auto Loader on S3 and ultimately that data coming in will end up in a Delta table that already has billions of rows. What is the best way to find out if any records that are coming in already exist in the target table? Yes, there is a unique identifier (an MD5 hash of 12 columns) to compare on, but isn't it not really scalable to look through billions of records over and over again? thoughts?
thanks!
p.s. I got my Databricks swag today after spending my points and while I always love a great tshirt, the cooler bag is sooooo incredibly awesome!
11-16-2022 02:39 AM
basically you add a where condition in your merge query.
F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.
If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.
An example (in scala but python is almost the same):
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
"""
11-14-2022 07:53 PM
If you records are partitioned to narrow down your search, then can you try writing an upsert logic after autoloader code?
The upsert logic will insert, update or drop rows as per your conditions.
11-15-2022 04:58 AM
thanks @Aman Sehgal ! I'll try this later this afternoon. do you happen to have a generic example or URL of the best practice way to implement in pyspark?
thanks
11-16-2022 02:39 AM
basically you add a where condition in your merge query.
F.E. let's say your target table is partitioned by year. Your incoming data contains only data of the current year.
If you want to merge, you can add a where <partitioncolumn> = 2022. This dramatically reduces the data read.
An example (in scala but python is almost the same):
spark.sql(s"""
|MERGE INTO $targetTableName
|USING $updatesTableName
|ON $targetTableName.par IN (1,0) AND $targetTableName.id = $updatesTableName.id
|WHEN MATCHED THEN
| UPDATE SET $targetTableName.ts = $updatesTableName.ts
|WHEN NOT MATCHED THEN
| INSERT (id, par, ts) VALUES ($updatesTableName.id, $updatesTableName.par, $updatesTableName.ts)
"""
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group