cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Incremental load from source; how to handle deletes

_TJ
New Contributor III
Small introduction; I'm a BI/Data developer, mostly working in MSSQL & DataFactory, coming from SSIS. Now I'm trying Databricks, to see if it works for me and my customers. I got enthusiastic by the video; https://www.youtube.com/watch?v=PIFL7W3DmaY&t=4652s, now I'm on the waiting list for streaming tables and materialized views. My apologies for probably some newbe questions, but sometimes it's hard to find the right works to search:)
 
I'm building a simplified PoC, with Sales Order Lines for an ERP system.
 
I'm loading parquet files in Azure ADLS, structure:

- Source system
-->Table (Orderlines)

---->Delta
------>2024-02-02
------>2024-02-03
---->Full
------>2024-02-01
 
 
Mostly once a week a full load, daily incremental. 
I was planning to use a STREAMING TABLE with APPLY_CHANGES to load the data. Now I'm wondering how to configure the 'merge/upsert' part, in the following two scenarios; we can do an incremental load from the source system in two ways: by date (fetch all order lines from the last 5 days) or by ID (fetch all order lines for the order IDs that have been changed the last 5 days). The problem lies in how to handle deletes in the source system. 
 
For the first case, it must delete all records after the given dates, that are not presented in the incremental load
For the second case, it must delete all records from the order IDs that are loaded in the incremental load but do not exist anymore.
 
 
Currently, in SQL we do this, with a bit of pseudo-code, but I hope you get the idea.
 
Case 1 - By date:
 
SELECT @date = MIN(Date) FROM Source
DELETE FROM Target WHERE Date > @date
INSERT INTO Target SELECT * FROM Source
 
Case 2 = By id
 
DELETE FROM Target WHERE ID IN (SELECT ID from Source)
INSERT INTO Target SELECT * FROM Source
 
 
Target stays up-to-date, and records that have been removed are deleted.
 
Is this possible in DLT? I can't find it in the documentation, only that the keys must be unique; which is not the case. 
 
I'm hoping that I'm missing something crucial since it's a fairly common issue I believe: incremental loading from a source system where - sometimes - rows will be deleted. Can anyone point me in the right direction?

 

Join Databricks' Distinguished Principal Engineer Michael Armbrust for a technical deep dive into how Delta Live Tables (DLT) reduces the complexity of data transformation and ETL. Learn what's new; what's coming; and how to easily master the ins-and-outs of DLT. Michael will describe and ...
4 REPLIES 4

Kaniz_Fatma
Community Manager
Community Manager

Hi @_TJThank you for sharing your background and your interest in Databricks! Let’s dive into your scenarios and explore how you can achieve similar functionality using Databricks Delta Live Tables (DLT).

  1. Streaming Tables and Materialized Views:

  2. Handling Incremental Loads and Deletes:

    • Let’s address your two scenarios:

    Case 1: Incremental Load by Date

    • To achieve this, you can create a streaming table that ingests data from your source system based on the date. You’ll use the APPLY_CHANGES operation to merge the changes into your target table.

    • When new data arrives, DLT will automatically apply the changes (inserts, updates, and deletes) to your target table. Records with dates older than the given date will be deleted from the target table.

    • Here’s a high-level example of how you might set this up:

      -- Create a streaming table
      CREATE STREAMING TABLE my_streaming_table
      USING PARQUET
      OPTIONS (
        'path' = '/path/to/parquet/files'
      );
      
      -- Apply changes to the target table
      APPLY CHANGES TO my_target_table
      FROM my_streaming_table;
      

    Case 2: Incremental Load by ID

    • For this scenario, you’ll follow a similar approach. Create a streaming table that ingests data based on order IDs. Again, use APPLY_CHANGES to merge the changes.

    • When new data arrives, DLT will handle inserts, updates, and deletes. Records with order IDs that no longer exist will be deleted from the target table.

    • Example:

      -- Create another streaming table
      CREATE STREAMING TABLE my_streaming_table_by_id
      USING PARQUET
      OPTIONS (
        'path' = '/path/to/parquet/files'
      );
      
      -- Apply changes to the target table
      APPLY CHANGES TO my_target_table
      FROM my_streaming_table_by_id;
      
  3. Unique Keys:

    • You mentioned that the keys are not unique. While DLT requires unique keys for primary key enforcement, you can still achieve your desired behavior.
    • Consider creating a synthetic unique key (e.g., concatenating order ID and date) to use as the primary key in your target table. This way, DLT can handle updates and deletes correctly.
  4. Refreshing Materialized Views:

    • Materialized views in DLT are powerful for aggregations and transformations. You can refresh them explicitly using the REFRESH statement3.

    • For example:

      -- Create a materialized view
      CREATE MATERIALIZED VIEW mv1
      AS SELECT order_id, SUM(amount) AS total_amount
      FROM my_target_table
      GROUP BY order_id;
      
      -- Refresh the materialized view
      REFRESH MATERIALIZED VIEW mv1;
      
    • DLT will automatically update the materialized view as new data arrives.

In summary, DLT provides the flexibility and power to handle incremental loads, deletes, and materialized views. While unique keys are essential, you can work around them by creating synthetic keys. Good luck with your PoC, and feel free to ask any further questions!

 

_TJ
New Contributor III

Hi @Kaniz_Fatma 

First of all, thanks for your comprehansive answer. Although I appreciate your input, I'm missing the answer to address my problem: I don't want to delete all data from the previous load, only the data that 'overlaps' my new load;

Case 1
In this case, I load data from the source system using a 'sliding' date window: I'm fetching all rows from (let say) the last month, because rows can change or removed in this month. So I want to keep everything allready loaded prior to the last month and delete everything from last month and further (because these are in my new load).

Case 2
Same for this case: I don't want to delete the older ID's that no longer exists: They do exist, but they are not in my icremental data load anymore: I'm only fetching the order line for the ID's that are changed. So it must keep all ID's from prior load, and delete all IDs that are in the new incremental load.

How can DLT handles these deletes? I can't see how a synthetic key could help in these cases.

Do you understand my problem? Otherwise, I will prepare some samples.

_TJ
New Contributor III

Does anyone have an idea?
@Kaniz_Fatma Could you tell me if it's possible?

abajpai
New Contributor II

@_TJ did you find a solution for sliding window

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group