cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Incremental monthly UDF analytics on large time-series: full reprocess vs change-driven recompute

valiro21
Contributor

Hi everyone,

I’m looking for advice and best practices for running monthly analytics on large time-series data in Databricks, where the source data can be updated retroactively.

Data model

We have a Delta table with time-series measurements:
- customer_id
- granularity (ISO duration strings like PT15M, PT30M)
- tinestamp (aligns with the granularity)
- value (the measurement)
New rows can be appended, and existing rows can be updated (late arriving corrections, backfills, etc.).

Workload

We run a Python UDF (treat it as a black box) that takes one customer + one month of time-series as input and outputs a few derived metrics (it is a non-trivial computation).
The users would prefer the results to reflect late updates a soon as possible, so we need to run this daily, not just once per month.

Scale
- around 300k customers
- multiple years of data
- historical updates can happen months later (e.g., data transfer issues)
For example, in Q4 last year we had approx. 5B rows


Approaches I am considering:
1) Full reprocess with hot and cold months

The daily job recomputes analytics for all customers for:
- H previous months
- optionally last C months

Pros:
- simple + clean pipeline
- easiest to maintain / debug

Cons:
- lots of wasted recomputation if only a small % of data changes

Open questions:
- How do people pick H and C, how should each one ve set on a daily basis?
- Any rules of thumb or patterns you have seen work in production?


2) Change-driven recompute
The daily job detects which (customer_id, month) partitions changed since last run via CDF and then recomputes only those keys.

Pros:
- minimizes wasted compute

Cons:
- higher implementation + maintenance complexity (change capture, state, joins)

Open questions:
- How do you decide the "lookback limit" here (if any)?

- Do people combine this with an occasional "deep recompute" outside the limit? (The deep recompute can become quite expensive and might take too much time)

- What’s a robust pattern for building the changed (customer_id, month) set?

- Is there a hidden cost when doing CDC? (Is there a significant overhead in compute and storage requirements)

- Have you seen this pattern used in production?

Also, because the goal is to reduce cost and runtime while staying correct, what I am also trying to find out, besides the previous open questions, is:

1. In Databricks / Delta production projects, is the "industry standard" more like full recompute, or change-driven recompute, or another approach?

2. If you do change-driven, what is the recommended way to detect changes at this scale (Delta CDF vs other patterns)?

3. Any advice on how to compare both approaches fairly (metrics to track, break-even points, etc.)?

4. Has anyone implemented a hybrid approach (incremental most days, fallback to full recompute if too many customers/months changed)?

1 ACCEPTED SOLUTION

Accepted Solutions

 @valiro21 ,

Yes, your understanding of automatic detection of new rows and incremental updates is correct.  

There is a list of operators supported by incremental refresh: ensure that the MV query complies with it. Otherwise, it will fall back to full recompute.

Leverage EXPLAIN CREATE MATERIALIZED VIEW to validate if the query definition is eligible for incremental refresh.

There are other scenarios where refresh may fall back to full recompute (for example, if there were too many changes in the source tables). Follow these good practices:

  • Decompose Complex MVs: Split large, complex MVs into multiple smaller ones. Excessive joins or deeply nested operators can sometimes exceed the complexity threshold for incrementalization.
  • Increase Update Frequency: If source tables change significantly between runs, the model may determine a full recompute is cheaper. If you see CHANGESET_SIZE_THRESHOLD_EXCEEDED in your logs, try running updates more frequently to reduce the volume of changes per update.
  • Ensure Deletion Vectors and Row-Level Tracking are enabled on your source tables: Deletion vectors minimize the changeset size, and row-level tracking is a prerequisite for incrementalizing certain operators.

Finally, you can force incremental refresh instead of relying on an automatic choice. 

You can monitor the refresh strategy (incremental or full recompute), as well as the reason why it was chosen, in the event log.

Best regards,

View solution in original post

8 REPLIES 8

aleksandra_ch
Databricks Employee
Databricks Employee

Hi @valiro21 ,

Your scenario is a classic late-arriving and incremental refresh data challenge. While you could build custom logic to track these changes, there is a native approach in Databricks that simplifies this significantly: Lakeflow Spark Declarative Pipelines with Materialized Views. Before addressing your questions, I would like you to consider this approach first.

This approach effectively turns a complex "change-driven" problem into a simple, declarative one. Here is how I’d structure your solution to achieve the best performance-to-maintenance ratio:

The Recommended Pattern: Materialized Views

Instead of manually choosing between a "Full Reprocess" or a custom "CDC" engine, use Materialized Views (MVs). The engine automatically detects which source data has changed and refreshes only the affected monthly buckets.

Requirements:

  1. Serverless Compute: Pipeline should run on serverless to unlock incremental refresh for Materialized Views.

  2. Deterministic UDFs: Ensure your Python UDF is registered in Unity Catalog and marked as DETERMINISTIC. This is crucial for the engine to safely incrementalize the results.

  3. The "PySpark Challenge": While your UDF works as a black box, I highly recommend rewriting the logic into native PySpark / SQL. This allows the optimizer to "see" the logic, enabling much more granular incremental updates and significantly lower compute costs. LLM assistants are great help for this.

  4. Source Table Tuning: Enable Deletion Vectors and Row Tracking on your source Delta table. These features allow the engine to pinpoint exactly which rows were updated or deleted without scanning entire files.

The initial run will reprocess the whole history of the table. After new data was ingested to the source table, run the pipeline update again. Monitor the event_log to know if update was incremental. There might be some adjustments needed to achieve incrementalization. Refer to this page for more guidance. 

Hope it helps

 

Thanks for the clarification. From what I understand, this is essentially approach 2, but instead of implementing custom incremental logic, we rely on the Materialized View engine to detect which groups changed and recompute only those.

To make it concrete, let’s say I create a materialized view with this query:

SELECT
customer_id,
date_trunc('month', timestamp) AS month,
median(value)
FROM time_series_table
GROUP BY
customer_id,
date_trunc('month', timestamp)

In this scenario, my expectation would be that during a refresh, Databricks automatically detects which (customer_id, month) combinations were affected by new or updated data and only recomputes those groups.

Can you confirm if that’s indeed how it works internally?

In other words, does the Materialized View engine track which group keys changed and incrementally recompute only those, without custom logic on our side?

Also, are there situations where the Materialized View ends up scanning or recomputing significantly more groups than strictly necessary (e.g. falling back to a broader recomputation or even full refresh)? Is there anywhere I can inspect the number of groups recomputed in a refresh?

That would really help to validate whether we can fully rely on the Materialized View for this pattern at scale.

 @valiro21 ,

Yes, your understanding of automatic detection of new rows and incremental updates is correct.  

There is a list of operators supported by incremental refresh: ensure that the MV query complies with it. Otherwise, it will fall back to full recompute.

Leverage EXPLAIN CREATE MATERIALIZED VIEW to validate if the query definition is eligible for incremental refresh.

There are other scenarios where refresh may fall back to full recompute (for example, if there were too many changes in the source tables). Follow these good practices:

  • Decompose Complex MVs: Split large, complex MVs into multiple smaller ones. Excessive joins or deeply nested operators can sometimes exceed the complexity threshold for incrementalization.
  • Increase Update Frequency: If source tables change significantly between runs, the model may determine a full recompute is cheaper. If you see CHANGESET_SIZE_THRESHOLD_EXCEEDED in your logs, try running updates more frequently to reduce the volume of changes per update.
  • Ensure Deletion Vectors and Row-Level Tracking are enabled on your source tables: Deletion vectors minimize the changeset size, and row-level tracking is a prerequisite for incrementalizing certain operators.

Finally, you can force incremental refresh instead of relying on an automatic choice. 

You can monitor the refresh strategy (incremental or full recompute), as well as the reason why it was chosen, in the event log.

Best regards,

Thank you for the previous clarifications regarding incremental refresh and fallback behavior. I have a few follow-up questions to better understand how Materialized Views behave in practice.

1. Full refresh semantics

Does a full refresh of a Materialized View always replace the entire target table, or does it merge results with what already exists?
Consider the following example:
CREATE MATERIALIZED VIEW mv_monthly_median AS
SELECT
customer_id,
date_trunc('month', timestamp) AS month,
percentile_approx(value, 0.5) AS median_value
FROM time_series_table
WHERE date_trunc('month', timestamp) >=
add_months(date_trunc('month', current_date()), -12)
GROUP BY
customer_id,
date_trunc('month', timestamp);


This computes the monthly median per customer for the last 12 months.

Suppose:

I run this today for the first time. Then I run it again 2 months later. Which of the following behaviors is correct?

1. A full refresh recomputes the last 12 months and merges them with existing results -> final table contains 14 months.
2. A full refresh recomputes the last 12 months and replaces all existing data -> final table contains 12 months.
3. An incremental refresh computes only the 2 new months and merges them -> final table contains 14 months.
4. An incremental refresh computes only the 2 new months and also deletes months now outside the 12-month window -> final table contains 12 months.

My understanding is that option 4 is what happens, since the MV reflects exactly the query definition and not accumulated history. Could you confirm?

If the desired behavior is option 3 (i.e., accumulate historical results beyond the rolling window), would the recommended approach be:
1. Create the MV with the rolling window logic
2. Then have a downstream table that merges new/updated rows from the MV into a historical results table

Is this the intended pattern?


2. Best practices for defining and deploying Materialized Views

- Are MVs SQL-only, or can they be defined using Python?
- If SQL-only, is it correct that Python UDF logic must be wrapped and called from SQL? Is it recommended to manage MVs using Databricks Asset Bundles for a dev/qas/prod setup?
- If definition via python is allowed, how should those MV be deployed?
- What is considered best practice for deploying MVs in production? Specifically:
- When a MV is redeployed (with the same name), is it effectively dropped and recreated, with its history dropped?
- OR does replacing an MV via deployment preserve its refresh state/history? If so:
- If I redeploy the same MV definition with no meaningful changes, does that trigger a full refresh?
- If I change something trivial (e.g., formatting or alias), does it force recomputation?
- If I change the filter window (see example below), does Databricks:
- Fully recompute everything?
- Or incrementally compute only the newly included time range?

Original example:
WHERE date_trunc('month', timestamp) >=
add_months(date_trunc('month', current_date()), -12)

New deployment:
WHERE date_trunc('month', timestamp) >=
add_months(date_trunc('month', current_date()), -16)


Would this cause:
- A full refresh?
- Or an incremental refresh that recomputes only the additional 4 months?

I am trying to understand how stable MV lifecycle management is in a CI/CD setup and how refresh behavior interacts with redeployments.

Thank you!

@valiro21 ,

You are correct, the behaviour will be as described in the Option 4.Because an MV is a declarative object, it must reflect the results of the query as if it were run from scratch. If your WHERE clause limits data to the last 12 months, any data older than 12 months will be deleted from the MV during the refresh to maintain consistency with the source.

The recommended pattern

Typically with SDP, you don't need to define "last 12 months", as it will be automatically handled by the incremental refresh. With the below approach:

CREATE MATERIALIZED VIEW mv_monthly_median AS
SELECT
customer_id,
date_trunc('month', timestamp) AS month,
percentile_approx(value, 0.5) AS median_value
FROM time_series_table
GROUP BY
customer_id,
date_trunc('month', timestamp);

The MV will naturally accumulate all history (14, 20, 24 months, etc.). MVs use incremental refreshing. It will only process new data as it arrives and never touch those old months (unless there is late arrival data). Simply apply the WHERE month >= ... filter in your downstream BI tool or Gold-layer view. This keeps your storage logic simple and your compute efficient.

2. Other questions:

  • Databricks Asset Bundles (DABs) is the gold standard for managing MVs across Dev/QAS/Prod environments.
  • SDP (MVs and STs) are available in both SQL and Python
  • Changing the MV formatting or table aliases within the query does not trigger a full refresh. Any change to the query logic itself (i.e., adding new column, changing group or aggregations, renaming columns) will trigger a full refresh.
  • To avoid a massive recomputation when logic changes, you can use the APPEND ONCE FLOW pattern. You create a new version of the MV, migrate existing data into it, and then allow the incremental refresh to take over for new data.
    • Create a new MV with new logic
    • Copy and modify the data from old MV to the new MV with APPEND ONCE 
    • Run refresh to get new data with the new logic

Best regards,

 

Thank you for the detailed explanation, this clarifies a lot.

I still have concerns around the APPEND ONCE FLOW pattern, especially in the context of dependent MVs and CDC propagation.

Let me outline two scenarios to explain the issue.

Scenario 1 – No APPEND ONCE flow

Suppose I change the aggregation logic from:

median(value)

to:

avg(value)

This is clearly a logical change, so it should trigger a full refresh of the MV. That part is expected and acceptable.

However, my concern is what happens downstream.

Assume:
- The new values (avg vs median) are numerically very close.
- Only around 2% of rows actually change value.
- But because the MV is fully recomputed, every row is rewritten.

From a CDC perspective, this would appear as massive changes (all rows updated), even if most values are identical.

If I have downstream MVs depending on this MV, would they now:
- Detect massive upstream changes
- And therefore also trigger full refresh instead of incremental refresh?

In other words:

Even if the logical difference is small, does a full recomputation upstream automatically cascade into full recomputation downstream because of rewritten rows?

This is the main architectural concern.

Scenario 2 – Using APPEND ONCE flow

The APPEND ONCE pattern is very clear in streaming use cases, where:
- A streaming table continues processing
- You append backfill data without resetting the checkpoint, basically a UNION operation

However, for MVs (which are declarative and not streams, not sure that an append flow is even allowed here), I am struggling to understand how this avoids recomputation in practice.

Let me walk through a simplified example.

Suppose I change from:

median_value

to:

median_value + 1

Now I try to apply APPEND ONCE flow.

The steps that were described:
1. Create a new MV with new logic
2. Copy and modify data from old MV into the new MV using APPEND ONCE
3. Run refresh to get new data

Questions:
- If I create a new MV with new logic, does it need a new name?
- If the logic changed, would refresh not trigger a full recomputation anyway?
- If I migrate data from the old MV into the new MV (for example apply +1 manually), then run refresh:
- Will Databricks do an incremental refresh on the first run with the changes? (This contradicts "Any change to the query logic triggers a full refresh").
- Or will it still perform a full refresh due to logic change?

How exactly does APPEND ONCE prevent full recomputation in a non-streaming MV?

Core concern:

Even if APPEND ONCE avoids recomputing the table itself, the CDC volume issue remains.

If I:
- Recompute or rewrite a large MV (even with identical values)
|- Then downstream MVs may detect large upstream changes
|- Which could force them into full refresh mode as well.

Is there any way to avoid this? I am trying to understand how to safely evolve aggregation logic in layered MV architectures without triggering cascading full recomputations and massive CDC events.
Any deeper clarification on how APPEND ONCE works internally for MVs (not streaming tables) would be very helpful.

Thank you.

Hi @valiro21 ,

You are correct that in your case APPEND ONCE flow won't work as you have MV as target. 

The scenario that you provided is quite specific. Most of the times, if the MV logic changes, the new logic will require to look back to the source data and recompute the aggregates. The engine is not aware of the fact that avg and median are close values for your dataset. It will need to recompute anyways to keep the promise of the MV being "always correct". 

An option would be to include both metrics (avg and median) in the initial MV to avoid changing the MV definition.

Another option: create a new MV with new logic and union it together with the modified values from the old MV.

Downstream MV will indeed get updates of the upstream MV and will potentially need to recompute old values (this can happen incrementally). You can perform tests and check how it behaves.

Best regards,

So if I understand correctly, the general recommendation for this type of workload is to leverage the new Spark 4 pipelines with Materialized Views for incremental updates, while being mindful of the constraints around logic changes, full refresh behavior, and downstream recomputation.

In our case, it seems that MV-based incremental refresh is the right direction, but we need to carefully design around:
- Logial changes which can trigger big CDC propagation to downstream MVs,
- Architectural choices to minimize cascading recomputations.

From my side, the core questions raised in this thread are now sufficiently clarified, particularly considering the capabilities introduced with the new Spark 4 pipelines. I will open a separate topic focused specifically on some of the more advanced edge cases and architectural patterns for those use cases.

Thank you for the clarifications!