cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

The best practice to remove old data from DLT pipeline created tables

LasseL
New Contributor III

Hi, didn't find any "reasonable" way to clean old data from DLT pipeline tables. In DLT we have used materialized views and streaming tables (scd1, append only). What is the best way to delete old data from the tables (storage size increases linearly, of course new measures comes every day and old data is not deleted... yet). Let say that we want do delete all measurement which measurement timestamp is older than one month.

1 ACCEPTED SOLUTION

Accepted Solutions

TinasheChinyati
New Contributor III

@LasseL 

1. Enable Change Data Capture (CDC):
Enable CDC before deleting data to ensure Delta tables track inserts, updates, and deletes. This allows downstream pipelines to handle deletions correctly.

 

ALTER TABLE your_table SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

 

2. Delete Old Data:
Delete rows older than one month based on the measurement_timestamp column.

 

DELETE FROM your_table WHERE measurement_timestamp < current_date() - INTERVAL 1 MONTH;

 

3. Vacuum the Table:
Cleanup deleted data and free up storage by running a VACUUM command.

 

VACUUM your_table RETAIN 7 HOURS;

 

4. Downstream Query Considerations:
Use table_changes to process incremental changes in downstream pipelines.
Batch Query Example:

 

SELECT *
FROM table_changes('your_table')
WHERE _change_type IN ('insert', 'update');

 

Streaming Query with DLT Example:

 

@dlt.view
def source_table_changes():
   return spark.readStream.format('delta') \
       .option('readChangeFeed', 'true') \
       .table('your_table') \
       .where("_change_type IN ('insert', 'update')")

dlt.create_streaming_table("downstream_table")
dlt.apply_changes(
   target="downstream_table",
   source="source_table_changes",
   keys=['unique_key'],  # Primary key column(s)
   sequence_by='measurement_timestamp'  # Sequence column for updates
)

 

Summary

1. Enable CDC on the source table using delta.enableChangeDataFeed.
2. Periodically delete old data and vacuum the source table to manage storage.
3. Use apply_changes or table_changes to process updates, inserts, and deletes into downstream tables.
4. Rely on Delta Live Tables (DLT) to ensure consistency and incremental processing.

View solution in original post

6 REPLIES 6

Retired_mod
Esteemed Contributor III

Hi @LasseL, To manage old data in Delta Live Tables (DLT), use a combination of Delta Lake's features and DLT capabilities: start by using Delta Lake's `DELETE` command to remove outdated records, for example, by deleting entries older than one month. Integrate this deletion logic into your DLT pipeline with a Python function that filters out old data. Schedule regular maintenance tasks using Databricks Jobs to automate this process. After deletion, run the `OPTIMIZE` command to enhance query performance and use `VACUUM` to clean up unused files and free up storage space. These steps will help keep your storage usage in check and ensure efficient data management.

This approach doesn't work with DLT, because the manual DELETE command immediately causes pipeline corruption. We have to use skipChangeCommits=True, but this option has several limitations, at least it cannot be used with apply_changes(). Is there any other approach to cleaning up old data in DLT?

LasseL
New Contributor III

Exactly, this is not a "trivial problem", one possible solution is take bronze out of DLT pipeline (to manage by yourself, for example structured streaming from source with skipChangeCommits and partitioned by year/month, what ever you want to do to make delete handling optional), then define silver layer in DLT with "full refresh/overwrite". Of course this not fit in all situations. Just gave some idea.

Rishabh-Pandey
Esteemed Contributor

One effective way is to use partitioning in your Delta tables based on the timestamp of your data. This way, you can take advantage of Delta Lake’s  Vacuum command to remove old files and reduce the size of your storage.

also vacuum and optimise command can be run on delta live Table.

Rishabh Pandey

edman
New Contributor II

If you do a full refresh on that streaming table sourc, that should remove old data.  I am assuming you are feeding this in an scd type 1 which overwrites the data.

TinasheChinyati
New Contributor III

@LasseL 

1. Enable Change Data Capture (CDC):
Enable CDC before deleting data to ensure Delta tables track inserts, updates, and deletes. This allows downstream pipelines to handle deletions correctly.

 

ALTER TABLE your_table SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

 

2. Delete Old Data:
Delete rows older than one month based on the measurement_timestamp column.

 

DELETE FROM your_table WHERE measurement_timestamp < current_date() - INTERVAL 1 MONTH;

 

3. Vacuum the Table:
Cleanup deleted data and free up storage by running a VACUUM command.

 

VACUUM your_table RETAIN 7 HOURS;

 

4. Downstream Query Considerations:
Use table_changes to process incremental changes in downstream pipelines.
Batch Query Example:

 

SELECT *
FROM table_changes('your_table')
WHERE _change_type IN ('insert', 'update');

 

Streaming Query with DLT Example:

 

@dlt.view
def source_table_changes():
   return spark.readStream.format('delta') \
       .option('readChangeFeed', 'true') \
       .table('your_table') \
       .where("_change_type IN ('insert', 'update')")

dlt.create_streaming_table("downstream_table")
dlt.apply_changes(
   target="downstream_table",
   source="source_table_changes",
   keys=['unique_key'],  # Primary key column(s)
   sequence_by='measurement_timestamp'  # Sequence column for updates
)

 

Summary

1. Enable CDC on the source table using delta.enableChangeDataFeed.
2. Periodically delete old data and vacuum the source table to manage storage.
3. Use apply_changes or table_changes to process updates, inserts, and deletes into downstream tables.
4. Rely on Delta Live Tables (DLT) to ensure consistency and incremental processing.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now