cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT - deduplication pattern?

Jfoxyyc
Valued Contributor

Say we have an incremental append happening using autoloader, where filename is being added to the dataframe and that's all. If we want to de-duplicate this data in a rolling window, we can do something like

merge into logs 
using dedupedLogs 
   on logs.id = dedupedLogs.id and logs.date > current_date() - interval 7 days 
when matched and dedupedLogs.date > current_date() - interval 7 days then update set *
when not matched and dedupedLogs.date > current_date() - interval 7 days then insert *

where dedupedLogs is a subset of the incremental append (7 days) with an ordered hash key where rownum = 1.

Can this be done in delta live table? I've attempted using apply_changes() but keep getting an error that an upstream has changed. DLT's new algorithm doesn't seem to pick up that the second table, the de-duped one, should be an incremental table yet.

2 REPLIES 2

Anonymous
Not applicable

@Jordan Fox​ :

If you're getting an error about upstream changes, it might be because the table schema or partitioning has changed. You can try running DESCRIBE EXTENDED logs and DESCRIBE EXTENDED dedupedLogs to compare the schemas and see if there are any differences.

Yes, it is possible to perform a rolling window de-duplication in Delta Lake using the merge operation. You can merge the incremental data with the existing data in the Delta table and update or insert records based on a condition. Giving an example as below

MERGE INTO logs
USING (
  SELECT id, filename, date 
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY date DESC) as rownum
    FROM dedupedLogs
    WHERE date > current_date() - interval 7 days
  ) WHERE rownum = 1
) AS d
ON logs.id = d.id AND logs.date > current_date() - interval 7 days
WHEN MATCHED THEN UPDATE SET logs.filename = d.filename
WHEN NOT MATCHED THEN INSERT (id, filename, date) VALUES (d.id, d.filename, d.date);

In this example, dedupedLogs is the table that contains the de-duplicated data for the past 7 days. We use the ROW_NUMBER() window function to assign a row number to each record within a group of records with the same id, ordered by the date column in descending order. We then select only the records with rownum = 1 to get the most recent record for each id. The MERGE INTO statement matches records in logs with records in d using the id column, and filters them by the date column. If a match is found, the filename column in logs is updated with the value from d. If there is no match, a new record is inserted into logs with the values from d.

Anonymous
Not applicable

Hi @Jordan Fox​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group