cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

The best practice to remove old data from DLT pipeline created tables

LasseL
New Contributor II

Hi, didn't find any "reasonable" way to clean old data from DLT pipeline tables. In DLT we have used materialized views and streaming tables (scd1, append only). What is the best way to delete old data from the tables (storage size increases linearly, of course new measures comes every day and old data is not deleted... yet). Let say that we want do delete all measurement which measurement timestamp is older than one month.

5 REPLIES 5

This approach doesn't work with DLT, because the manual DELETE command immediately causes pipeline corruption. We have to use skipChangeCommits=True, but this option has several limitations, at least it cannot be used with apply_changes(). Is there any other approach to cleaning up old data in DLT?

LasseL
New Contributor II

Exactly, this is not a "trivial problem", one possible solution is take bronze out of DLT pipeline (to manage by yourself, for example structured streaming from source with skipChangeCommits and partitioned by year/month, what ever you want to do to make delete handling optional), then define silver layer in DLT with "full refresh/overwrite". Of course this not fit in all situations. Just gave some idea.

Rishabh-Pandey
Esteemed Contributor

One effective way is to use partitioning in your Delta tables based on the timestamp of your data. This way, you can take advantage of Delta Lakeโ€™s  Vacuum command to remove old files and reduce the size of your storage.

also vacuum and optimise command can be run on delta live Table.

Rishabh Pandey

edman
New Contributor II

If you do a full refresh on that streaming table sourc, that should remove old data.  I am assuming you are feeding this in an scd type 1 which overwrites the data.

TinasheChinyati
New Contributor III

@LasseL 

1. Enable Change Data Capture (CDC):
Enable CDC before deleting data to ensure Delta tables track inserts, updates, and deletes. This allows downstream pipelines to handle deletions correctly.

 

ALTER TABLE your_table SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

 

2. Delete Old Data:
Delete rows older than one month based on the measurement_timestamp column.

 

DELETE FROM your_table WHERE measurement_timestamp < current_date() - INTERVAL 1 MONTH;

 

3. Vacuum the Table:
Cleanup deleted data and free up storage by running a VACUUM command.

 

VACUUM your_table RETAIN 7 HOURS;

 

4. Downstream Query Considerations:
Use table_changes to process incremental changes in downstream pipelines.
โ€ข Batch Query Example:

 

SELECT *
FROM table_changes('your_table')
WHERE _change_type IN ('insert', 'update');

 

โ€ข Streaming Query with DLT Example:

 

@dlt.view
def source_table_changes():
   return spark.readStream.format('delta') \
       .option('readChangeFeed', 'true') \
       .table('your_table') \
       .where("_change_type IN ('insert', 'update')")

dlt.create_streaming_table("downstream_table")
dlt.apply_changes(
   target="downstream_table",
   source="source_table_changes",
   keys=['unique_key'],  # Primary key column(s)
   sequence_by='measurement_timestamp'  # Sequence column for updates
)

 

Summary

1. Enable CDC on the source table using delta.enableChangeDataFeed.
2. Periodically delete old data and vacuum the source table to manage storage.
3. Use apply_changes or table_changes to process updates, inserts, and deletes into downstream tables.
4. Rely on Delta Live Tables (DLT) to ensure consistency and incremental processing.