cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

DTL - Delta Live Tables & Handling De-Duplication from Source Data

thedatacrew
New Contributor III

Hello,

Could anyone please help regarding the scenario below?

Scenario

• I'm using the DLT SQL Language
• Parquet files are landed each day from a source system.
• Each day, the data contains the 7 previous days of data. The source system can have very late arriving data and can be unstable from time to time.
• There will be a lot of duplicate data that is precisely the same or has been updated via a timestamp.
• I'm reading the stream using cloud_files into a bronze table.

Requirements

• I need to de-duplicate the streamed data from landing to bronze to keep the size down.

Problem

• Windowing functions i.e. "row_number over(partition by row_id order by date_modified desc) as row_ver" is not supported in structured streaming tables.
• DISTINCT would only capture exact duplicated rows.

Question

Before, using streaming merge, I could use foreachBatch and de-duplicate the batch before merging the data to the target. That worked quite well, although a bit fiddly.

• What are the guidelines for de-duplicating streaming data in Delta Live Tables?
• Is there a way to do this in DLT Python, i.e. dropDuplicates?
• What is the community doing about data de-duplication from streaming sources?

Any advice or guidance is much appreciated.

Thank you.

4 REPLIES 4

Lakshay
Esteemed Contributor
Esteemed Contributor

This looks like a good use case to use "change data feed " in DLT. You can explore more about it here: https://docs.databricks.com/en/delta/delta-change-data-feed.html

thedatacrew
New Contributor III

Thanks, I'm already using CDC & Streaming with foreachBatch() to de-duplicate incoming data but I'm trying to achieve the same behaviour with Delta Live Tables. Is there any support for this in DLT (SQL or PySpark)?

Lakshay
Esteemed Contributor
Esteemed Contributor

Yes, it is available in DLT. Check this document: https://docs.databricks.com/en/delta-live-tables/cdc.html

thedatacrew
New Contributor III


Hi, Thanks for the suggestion.

Performance is dreadful writing from cloud_files using apply into (61M rows). So I have loaded bronze, including the duplicates and used the silver layer to de-dup using apply into.

headers 6.5M rows
items 61M rows

The pipeline executes and on a full refresh, never finished, using 1 worker & 4 executors. runs out of memory after about 30 minutes. There are no issues with structured streaming and delta. I'm pretty sure the below should work, but not having much luck. The join seems to be killing it.

CREATE STREAMING TABLE sales_header(
  sales_header_bk string,
  date_created timestamp,
  date_month_closed date
  ...more fields....
  CONSTRAINT valid_site_bk EXPECT (sales_header_bk IS NOT NULL) ON VIOLATION FAIL UPDATE
) 
PARTITIONED BY (date_month_closed) 
COMMENT "The cleaned sales header valid sales_header_bk. Partitioned by date_month_closed." 
TBLPROPERTIES ("quality" = "silver");

APPLY CHANGES INTO LIVE.epos_sales_header
FROM
  (
    SELECT
      concat_ws('-', h.site_id, h.check_item_id, h.split_number) as sales_header_bk,
        ...more fields....
    FROM
      STREAM(dev_bronze.quadranet.sales_masterdata_header) h
  ) KEYS (sales_header_bk) SEQUENCE BY date_created;



CREATE STREAMING TABLE LIVE.sales_item(
  sales_item_bk string,
  sales_header_bk string,
  ...more fields....
  date_created timestamp,
  date_loaded_utc timestamp,
  date_closed timestamp,
  date_closed_adjusted int,
  date_month_closed int,
  date_id_closed int,
  date_id_closed_adjusted int,
  CONSTRAINT valid_site_id EXPECT (sales_item_bk IS NOT NULL) ON VIOLATION FAIL UPDATE
) PARTITIONED BY (date_month_closed) COMMENT "The cleaned sales items with valid sales_header_bk. Partitioned by date_month_closed." TBLPROPERTIES ("quality" = "silver");
APPLY CHANGES INTO LIVE.sales_item
FROM
  (
    SELECT
      i.key as sales_item_bk,
      concat_ws('-', i.site_id, i.check_item_id, i.split_number) as sales_header_bk,
	  i.date_created,
      ...more fields....
      h.date_closed,
      h.date_closed_adjusted,
      h.date_month_closed,
      h.date_id_closed,
      h.date_id_closed_adjusted
    from
      STREAM(bronze.sales_item) i
      INNER JOIN STREAM(LIVE.sales_header) h on i.sales_item_bk = h.sales_item_bk      
  ) KEYS (sales_item_bk) SEQUENCE BY date_created

Here is the SQL sample (there are a lot more fields)

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.