โ08-07-2024 06:02 AM
Hi, didn't find any "reasonable" way to clean old data from DLT pipeline tables. In DLT we have used materialized views and streaming tables (scd1, append only). What is the best way to delete old data from the tables (storage size increases linearly, of course new measures comes every day and old data is not deleted... yet). Let say that we want do delete all measurement which measurement timestamp is older than one month.
โ08-23-2024 01:16 AM
This approach doesn't work with DLT, because the manual DELETE command immediately causes pipeline corruption. We have to use skipChangeCommits=True, but this option has several limitations, at least it cannot be used with apply_changes(). Is there any other approach to cleaning up old data in DLT?
โ08-23-2024 02:14 AM
Exactly, this is not a "trivial problem", one possible solution is take bronze out of DLT pipeline (to manage by yourself, for example structured streaming from source with skipChangeCommits and partitioned by year/month, what ever you want to do to make delete handling optional), then define silver layer in DLT with "full refresh/overwrite". Of course this not fit in all situations. Just gave some idea.
โ08-23-2024 02:35 AM - edited โ08-23-2024 02:37 AM
One effective way is to use partitioning in your Delta tables based on the timestamp of your data. This way, you can take advantage of Delta Lakeโs Vacuum command to remove old files and reduce the size of your storage.
also vacuum and optimise command can be run on delta live Table.
โ09-20-2024 08:16 AM
If you do a full refresh on that streaming table sourc, that should remove old data. I am assuming you are feeding this in an scd type 1 which overwrites the data.
a month ago
1. Enable Change Data Capture (CDC):
Enable CDC before deleting data to ensure Delta tables track inserts, updates, and deletes. This allows downstream pipelines to handle deletions correctly.
ALTER TABLE your_table SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
2. Delete Old Data:
Delete rows older than one month based on the measurement_timestamp column.
DELETE FROM your_table WHERE measurement_timestamp < current_date() - INTERVAL 1 MONTH;
3. Vacuum the Table:
Cleanup deleted data and free up storage by running a VACUUM command.
VACUUM your_table RETAIN 7 HOURS;
4. Downstream Query Considerations:
Use table_changes to process incremental changes in downstream pipelines.
โข Batch Query Example:
SELECT *
FROM table_changes('your_table')
WHERE _change_type IN ('insert', 'update');
โข Streaming Query with DLT Example:
@dlt.view
def source_table_changes():
return spark.readStream.format('delta') \
.option('readChangeFeed', 'true') \
.table('your_table') \
.where("_change_type IN ('insert', 'update')")
dlt.create_streaming_table("downstream_table")
dlt.apply_changes(
target="downstream_table",
source="source_table_changes",
keys=['unique_key'], # Primary key column(s)
sequence_by='measurement_timestamp' # Sequence column for updates
)
Summary
1. Enable CDC on the source table using delta.enableChangeDataFeed.
2. Periodically delete old data and vacuum the source table to manage storage.
3. Use apply_changes or table_changes to process updates, inserts, and deletes into downstream tables.
4. Rely on Delta Live Tables (DLT) to ensure consistency and incremental processing.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group