cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
yas_mokri
New Contributor II

Introduction

Deleting specific data from our tables may be necessary for various reasons, such as complying with regulations like The General Data Protection Regulation (GDPR) and California Consumer Privacy Act (CCPA), which require companies to remove user data within a set timeframe. It is essential to ensure that this data is removed from all relevant tables, especially when dealing with sensitive information such as personally identifiable information (PII).

Delta Live Tables (DLT), a popular Databricks product, simplifies building and running ETL pipelines, especially for streaming workloads. While some methods for propagating deletes when using DLT are covered in the blog post Handling "Right to be Forgotten" in GDPR and CCPA using Delta Live Tables (DLT), I will be exploring a new approach using the Change Data Feed (CDF) feature, recently enabled for DLT streaming tables. This feature allows us to propagate deletes more efficiently throughout the pipeline.

In this post, I’ll show you how to delete data from an upstream table and use the Delta CDF and apply_changes to propagate those deletes.

Reference architecture

In this example, the pipeline starts with the user_bronze table, which ingests data from the source. When a row is deleted from this table, we will use the CDF along with the apply_changes function to propagate the deletion to the downstream user_silver table, which is a streaming table and the target of apply_changes. Finally, the user_gold table, a materialised view that summarises the data, reflects the updates from user_silver. The following sections break down each step in detail.

GDPR-blog.jpeg

 

Delete the data from the bronze table

The first table in the pipeline, where data from the source is stored, is called user_bronze, and it’s defined as a streaming table loading data incrementally from a Kafka topic

 @Dlt.table()
 def user_bronze:
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", "topic1")
        .load()
    )

Now, we need to delete one of the rows from this table - which we want to subsequently be reflected in all downstream tables. We can run DML statements, including DELETE, on a DLT streaming table. To delete a row with a specific user_email from the user_bronze table, we can use this statement, which runs outside DLT (e.g., from a notebook).

DELETE FROM user_bronze WHERE user_email = "user1@mail.com"

This type of efficient point deletion is possible thanks to ACID transactions in Delta, along with optimisations like deletion vectors, data skipping using Z-ordering, and, more recently, liquid clustering.

Note, that this command only removes the data from the table, not from the underlying parquet files. I’ll cover how to remove data from the files in the section on Vacuum and Deletion Vectors, which is important for cases like GDPR compliance.

How to propagate deletes with change data feed (CDF)

After deleting data from the most upstream table (user_bronze in our example), the delete needs to be applied to all downstream tables as well.

Previously, methods discussed in the blog post Handling "Right to be Forgotten" in GDPR and CCPA using Delta Live Tables (DLT) included performing a full refresh of downstream streaming tables or manually implementing deletes for each streaming table. While these work, they aren’t always efficient or require extra processes to remove rows from multiple locations.

Now, with Delta CDF available for DLT streaming tables, we can take a more streamlined approach for these use cases using the apply_changes API. In cases where we cannot use this function, we still need to rely on the earlier methods. I'll provide more details on this later in the blog post, but for now, let's dive into how we can use CDF to improve on those previous approaches.

Change Data Feed (CDF)

When a streaming table is updated or data is deleted, it’s impossible to incrementally retrieve those changes via streaming since we can only stream from an append-only table—any updates or deletes in the source table would break the pipeline.

CDF solves this by allowing us to incrementally retrieve all changes to a Delta table, including updates and deletes, and propagate them downstream.

This feature is now enabled for DLT streaming tables by default. This includes the streaming tables that are the target of apply_changes command.

Propagating deletes using CDF and apply_changes

The apply_changes function in DLT can delete a row from the target table if the corresponding row is deleted from the source. For example, when processing data from Bronze to Silver, we can read CDF from the Bronze table and use apply_changes on the target table with delete handling enabled. This is done by using the apply_as_deletes option, which specifies the criteria for triggering a delete. With CDF enabled on the source streaming table, deletes are marked by setting _change_type to delete in the CDF data.

To read CDF data from a source table (e.g., the user_bronze table), here’s the approach:

 @Dlt.view()
 def cdf_user_bronze():
    df = spark.readStream.option("readChangeFeed", "true").table(f"LIVE.user_bronze")
    return df

Note, currently, we need to use spark.readStream to read the CDF data instead of dlt.read_stream. However, we must ensure to use the LIVE keyword so that DLT handles the dependencies between the bronze table (user_bronze) and the CDF view (cdf_user_bronze).

The apply_changes statement will look like this, for our target table user_silver:

create_streaming_table("user_silver")

apply_changes(
  target = "user_silver",
  source = "cdf_user_bronze",
  keys = ["user_id"],
  sequence_by = struct('user_timestamp','_commit_version'),
  except_column_list = ["_change_type", "_commit_version", "_commit_timestamp"],
  apply_as_deletes = expr("_change_type = 'delete'"),
  stored_as_scd_type = 1
)

I have included both user_timestamp and _commit_version so that if there are multiple rows in the CDF data for the same user_id and user_timestamp (such as both INSERTs and UPDATEs), the rows are applied in the correct order based on the _commit_version.

The except_column_list prevents these certain columns from being stored in the target table, as including them would prevent DLT from turning on CDF for the table, resulting in this warning message:

Screenshot 2024-09-19 at 11.12.46 AM.png

When using apply_changes to delete rows, keep in mind that with SCD Type 2, the existing row is only marked as expired in the target table. So, if full removal of a row is required, such as for strict GDPR compliance, SCD Type 1 must be used to guarantee the row is completely deleted.

With apply_changes out-of-order data is handled by temporarily retaining the deleted row as a tombstone in the underlying Delta table. A view is automatically created in the metastore to filter out these tombstones, so they aren't visible in queries.

It is possible to configure how long these tombstones are retained using the below settings:

  • pipelines.cdc.tombstoneGCThresholdInSeconds
  • pipelines.applyChanges.tombstoneGCFrequencyInSeconds

Currently, if these downstream tables are also part of a DLT pipeline, we'll need to use the preview channel when setting up the DLT pipeline. This is because the CDF for the target of apply_changes is only available in DBR 15.2 and above, which is - at the time of writing - supported in the preview channel. The DLT release notes document provides the latest versions of both the current and preview channels for future reference.

Propagate deletes for append-only streaming tables 

So far, I've discussed propagating deletes for tables designed to use the apply_changes API, where apply_changes was already an integral part of the pipeline for updating target tables. When working with append-only streaming tables, it's possible to use CDF and switch to apply_changes in some cases, but not in others. The two scenarios are:

Propagating deletes for datasets with unique keys

If our dataset doesn’t contain duplicate keys, we can use apply_changes with SCD Type 1 while maintaining an append-only table (since no updates are necessary). This approach allows us to propagate deletes using the method described earlier. However, apply_changes is more computationally expensive compared to a simple append, so it's important to consider its potential impact on data processing latency.

Propagate deletes when keys can be duplicated 

If our dataset contains duplicate keys and we intend to append new rows without updating existing ones, apply_changes won't work for propagating deletes, as it updates existing data rather than appending. In this case, we can follow Solution 3 from the Handling "Right to be Forgotten" in GDPR and CCPA using Delta Live Tables (DLT) blog post, either by running individual DELETEs on each streaming table or performing a full refresh. 

Propagate deletes for materialised views

The gold table in our example is a materialised view defined as below. 

CREATE MATERIALIZED VIEW user_gold AS
WITH PercentileCTE AS (
   SELECT
       user_id,
       visits,
       PERCENTILE_CONT(0.9) WITHIN GROUP (ORDER BY visits) OVER () AS percentile_90
   FROM
       LIVE.user_silver
)
SELECT
   user_id,
   visits
FROM
   PercentileCTE
WHERE
   visits > percentile_90;

As mentioned in Handling "Right to be Forgotten" in GDPR and CCPA using Delta Live Tables (DLT), deletes can be propagated using materialised views (MVs) since MVs always reflect the latest data from their upstream sources. If a delete occurs upstream, it will be reflected in the MVs during their refresh.

If MVs and their upstream tables are part of the same DLT pipeline, DLT automatically handles MV refreshes whenever there are changes in the upstream tables. However, if MVs are in a separate pipeline, they need to be updated by scheduling Databricks jobs to run the pipelines sequentially.

Keep in mind that it is not possible to stream or read CDF from an MV, so any downstream table of the MVs must also be an MV to propagate deletes.

Remove data from storage 

Delta tables retain a history to support time travel and table restoration. This means that even after a delete or update, the underlying data is still kept for a certain period, even though it’s no longer visible in the current version of the table.

To ensure data is fully removed within the required timeframe, especially for compliance purposes, it's necessary to configure the table's history retention period accordingly and run VACUUM to remove any files containing deleted data from object storage. The VACUUM operation removes files that are no longer relevant to the current state of the Delta table. For details on configuring the retention period, refer to  this documentation. It is recommended that the retention period be set to at least 7 days. For DLT, this can be set up using table properties when defining a table.

Even though the recommended approach for tasks like VACUUM on Unity Catalog (UC) is to allow predictive optimisation to manage them, this is currently not supported for DLT (see the limitations here). However, VACUUM is performed as part of DLT maintenance tasks. Note that these maintenance tasks occur within 24 hours of a table being updated and are performed only if a pipeline update has run within the 24 hours before the maintenance tasks are scheduled. So, if a row is deleted using DML, it’s essential that the relevant DLT pipelines run within the necessary timeframe.

When enabling deletion vectors (DVs) on a table, it's important to remember that DVs are designed to optimise writes by adding metadata that marks data as deleted rather than actually removing it from the underlying parquet files. If we need to physically remove data from cloud storage for compliance purposes, such as with GDPR, we’ll need to run REORG to rewrite the files and apply the deletes. However, for the target tables of apply_changes, we currently don’t have access to the underlying tables to run REORG. Therefore, it’s recommended not to enable DVs on tables that are the target of apply_changes if full deletion from cloud storage is required.

Conclusion

In this post, we have explored how to efficiently propagate deletes across Delta Live Tables (DLT) pipelines using Change Data Feed (CDF) and apply_changes. We can streamline the propagation of deletes using this method if the tables in our pipeline can be updated using apply_changes. This includes streaming pipelines where, by design, we want to update a table using SCD Type 1 or SCD Type 2 methods. It also works for streaming tables that are append-only (no updates needed), but since there are no duplicate keys, using apply_changes still results in an append-only table.

For append-only streaming tables with duplicate keys or when performance is a concern, we must rely on the methods outlined Handling "Right to be Forgotten" in GDPR and CCPA using Delta Live Tables (DLT) to propagate deletes.

If it is necessary to remove data from both the underlying files and the tables, we should employ SCD Type 1 and adjust the Delta history retention period and VACUUM accordingly.

Contributors