cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Gold aggregation with apply_change

deepu1
New Contributor

I am building a Gold table using Delta Live Tables (DLT). The Gold table contains aggregated data derived from a Silver table. Aggregation happens monthly. However, the requirement is Only the current (year, month) should be recalculated. Previous months must remain unchanged even if there is a change of data silver.

If I use @Dlt.table, the entire aggregation is recomputed every run

I tried using dlt.apply_changes, but I encountered errors as my filtered dataframe doesnt have complete data as it has only the data for the processing month.

 

Below is my apply_changes framework.

def apply_changes_to_table(table_name, compute_fn, apply_changes_keys😞
    table_schema, table_description = load_schema_from_yaml(table_name)
    processed_source_table = f"{table_name}_staging"

    @dlt.table(name=processed_source_table)
    def create_processed_source():
        return compute_fn()  # Already filtered

    dlt.create_streaming_table(
        name=table_name,
        comment=table_description,
        schema=table_schema,
    )

    dlt.create_auto_cdc_flow(
        target=table_name,
        source=processed_source_table,
        keys=apply_changes_keys,
        sequence_by=col("updated_timestamp"),
        stored_as_scd_type=1
    )
 
I want to understand, if Is it possible to do partial (current-month-only) aggregation updates in the Gold layer using apply_changes?

 

1 ACCEPTED SOLUTION

Accepted Solutions

aleksandra_ch
Databricks Employee
Databricks Employee

Hi @deepu1 ,

Assuming that @dlt.table refers to a Materialized View (MV), you are correct that this is the standard way to create aggregated tables in the Gold layer. A Materialized View is essentially a table that stores the results of a specific query.

Because of this, it is impossible to "ignore" certain rows in the source table or perform a partial update (e.g., only updating the current month) within a standard MV. Doing so would break the fundamental promise of a Materialized View: that the data consistently reflects the results of its defining query.

Regarding your attempt with AUTO CDC (formerly APPLY_CHANGES INTO😞 this feature is designed to update rows based on a primary key (UPSERT logic) rather than aggregating data. Therefore, it is not the appropriate tool for your Gold layer requirements.

To handle partial updates and ensure that late-arriving data from previous months does not change your historical Gold records, you must ensure that this data is filtered before or during the aggregation process. You can achieve this in two ways:

  1. Filter at the Bronze/Silver Layer: Filter out late-arriving rows during the ingestion process. For example, if month(updated_timestamp) < month(current_date()), the row is considered late and is dropped. This is a good solution if you can safely ignore late-arriving data entirely.

  2. Filter in the Gold Layer Definition: Introduce an current_date() AS ingestion_date column in your Bronze/Silver layer to track when a row was first processed. In your Gold MV definition, apply a filter: WHERE month(updated_timestamp) == month(ingestion_date). This ensures that if a row arrives in February but has a January timestamp, it is excluded from the aggregation.

In both scenarios, you can continue using Materialized Views for your Gold layer aggregations while ensuring that previous months remain unchanged. This approach aligns perfectly with the "declarative" nature of Lakeflow Spark Declarative Pipelines.

Hope it helps!

View solution in original post

1 REPLY 1

aleksandra_ch
Databricks Employee
Databricks Employee

Hi @deepu1 ,

Assuming that @dlt.table refers to a Materialized View (MV), you are correct that this is the standard way to create aggregated tables in the Gold layer. A Materialized View is essentially a table that stores the results of a specific query.

Because of this, it is impossible to "ignore" certain rows in the source table or perform a partial update (e.g., only updating the current month) within a standard MV. Doing so would break the fundamental promise of a Materialized View: that the data consistently reflects the results of its defining query.

Regarding your attempt with AUTO CDC (formerly APPLY_CHANGES INTO😞 this feature is designed to update rows based on a primary key (UPSERT logic) rather than aggregating data. Therefore, it is not the appropriate tool for your Gold layer requirements.

To handle partial updates and ensure that late-arriving data from previous months does not change your historical Gold records, you must ensure that this data is filtered before or during the aggregation process. You can achieve this in two ways:

  1. Filter at the Bronze/Silver Layer: Filter out late-arriving rows during the ingestion process. For example, if month(updated_timestamp) < month(current_date()), the row is considered late and is dropped. This is a good solution if you can safely ignore late-arriving data entirely.

  2. Filter in the Gold Layer Definition: Introduce an current_date() AS ingestion_date column in your Bronze/Silver layer to track when a row was first processed. In your Gold MV definition, apply a filter: WHERE month(updated_timestamp) == month(ingestion_date). This ensures that if a row arrives in February but has a January timestamp, it is excluded from the aggregation.

In both scenarios, you can continue using Materialized Views for your Gold layer aggregations while ensuring that previous months remain unchanged. This approach aligns perfectly with the "declarative" nature of Lakeflow Spark Declarative Pipelines.

Hope it helps!