3 weeks ago
[Sorry for a novice question.]
I have multiple tables periodically updated from external sources (including insert, update, or delete). I need to update a target table, which is an outer join from multiple source tables without rewriting it each time. I do not need to do it in real time, but only once a day.
What are Databricks' best practices, techniques, etc?
Will appreciate pointers to documentation, examples, and/or Academy lectures.
Regards,
Stas
Regards
Stas
3 weeks ago
Greetings @staskh , I did some digging and compiled my thoughts regarding your question.
Treat this as a standard Gold table built daily from Delta sources. Start simple. Add incremental tricks only when full recomputes stop scaling. Most teams overbuild this on day one.
Below are three patterns, ordered from simplest to most involved. Pick the lowest-numbered one that fits your data volume and SLA, then graduate only when you have a real reason to.
If your data volumes are reasonable, recompute the whole target table once a day. This is the simplest, most reliable approach, and it's often the right one to keep.
Do this:
Example:
CREATE OR REPLACE TABLE analytics.target_joined AS
SELECT ...
FROM source_a a
FULL OUTER JOIN source_b b ON ...
FULL OUTER JOIN source_c c ON ...
;
Why this works: everything is Delta, everything is ACID, and you avoid the complexity of incremental join logic. Don't move past this pattern unless rebuilds are clearly hurting cost or latency.
If the joined result is large and rebuilds get costly, define the target as a materialized view over your Delta sources. Let Databricks incrementally refresh it.
Do this:
CREATE OR REPLACE MATERIALIZED VIEW analytics.target_joined_mv AS
SELECT ...
FROM source_a a
FULL OUTER JOIN source_b b ON ...
FULL OUTER JOIN source_c c ON ...
;
ALTER TABLE source_a SET TBLPROPERTIES (
delta.enableDeletionVectors = true,
delta.enableRowTracking = true,
delta.enableChangeDataFeed = true
);
Serverless pipelines can incrementally refresh many queries, including ones with LEFT, RIGHT, and FULL OUTER JOIN, but only when the documented conditions are met. Read the conditions before you commit (1).
Use this when your source systems expose inserts, updates, and deletes, and you need to propagate just the changes.
Do this:
Don't reach for this pattern unless you actually need it. It scales to large, frequently changing tables and preserves history (SCD2) when you want it, but the engineering and operational cost is real.
Take these two courses, in order:
They walk through these exact patterns and complement the docs.
(1) Incremental refresh for materialized views: https://docs.databricks.com/aws/en/optimizations/incremental-refresh (2) Change data capture and snapshots: https://docs.databricks.com/aws/en/ldp/what-is-change-data-capture (3) The AUTO CDC APIs: https://docs.databricks.com/aws/en/ldp/cdc (4) Delta Change Data Feed: https://docs.databricks.com/aws/en/delta/delta-change-data-feed (5) CDC with Delta Live Tables: https://www.databricks.com/blog/2022/04/25/simplifying-change-data-capture-with-databricks-delta-liv...
Hope this helps.
Cheers, Louis.
3 weeks ago
Hi @staskh ,
For a daily batch use case, I would look at 3 patterns depending on your tables size and complexity.
IMO, the simplest and usually safest option is to create the target as a DBKS SQL materialized view if it is mainly the result of a join or denormalization because they physically store the result and can refresh on a schedule and DBKS can often refresh them incrementally and don't forget that outer joins are supported for incremental refresh when requirements are met although DBKS may still choose a full refresh if that is cheaper or required by the query plan.
Don't forget that for delta sources, incremental MV refresh requires features such as row tracking or CDF on the source tables.
This is an example to help you understand:
CREATE OR REPLACE MATERIALIZED VIEW gold.target_joined SCHEDULE CRON '0 0 2 * * ?' AT TIME ZONE 'UTC' AS SELECT COALESCE(a.id, b.id, c.id) AS id, a.col1, b.col2, c.col3 FROM silver.source_a a FULL OUTER JOIN silver.source_b b ON a.id = b.id FULL OUTER JOIN silver.source_c c ON COALESCE(a.id, b.id) = c.id;
and if you need more control you can use delta change data feed with merge and you can enable CDF on each source table, collect the changed business keys from all sources since the previous run, recompute only those keys from the current source tables and then merge them into the target.
Delta merge supports inserts, updates and deletes and you can use table_changes() for batch reads from CDF by version or timestamp.
another simple example to better understand what I mean :
-- 1. you start with collecting impacted keys from all sources
CREATE OR REPLACE TEMP VIEW changed_keys AS
SELECT id FROM table_changes('silver.source_a', <last_version_a> + 1)
UNION
SELECT id FROM table_changes('silver.source_b', <last_version_b> + 1)
UNION
SELECT id FROM table_changes('silver.source_c', <last_version_c> + 1);
-- 2. then you recompute only impacted keys from the current source state
CREATE OR REPLACE TEMP VIEW recomputed_rows AS
SELECT
COALESCE(a.id, b.id, c.id) AS id,
a.col1,
b.col2,
c.col3
FROM changed_keys k
LEFT JOIN silver.source_a a ON k.id = a.id
LEFT JOIN silver.source_b b ON k.id = b.id
LEFT JOIN silver.source_c c ON k.id = c.id;
-- 3. and you merge into target
MERGE INTO gold.target_joined t
USING recomputed_rows s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;for deletes, be careful : why ? because with an outer join a delete in one source does not always mean deleting the target row and because the key might still exist in another source.
So here I can say the best thing to do is to detect changed keys, recompute the final joined row for those keys then delete the target row only if the key no longer exists in any source.
If your upstream sources provide CDC events or only periodic snapshots you can think about Lakeflow declarative pipelines AUTO CDC / AUTO CDC FROM SNAPSHOT because these APIs are designed to simplify SCD type 1 and 2 handling from CDC feeds or snapshots.
3 weeks ago
Greetings @staskh , I did some digging and compiled my thoughts regarding your question.
Treat this as a standard Gold table built daily from Delta sources. Start simple. Add incremental tricks only when full recomputes stop scaling. Most teams overbuild this on day one.
Below are three patterns, ordered from simplest to most involved. Pick the lowest-numbered one that fits your data volume and SLA, then graduate only when you have a real reason to.
If your data volumes are reasonable, recompute the whole target table once a day. This is the simplest, most reliable approach, and it's often the right one to keep.
Do this:
Example:
CREATE OR REPLACE TABLE analytics.target_joined AS
SELECT ...
FROM source_a a
FULL OUTER JOIN source_b b ON ...
FULL OUTER JOIN source_c c ON ...
;
Why this works: everything is Delta, everything is ACID, and you avoid the complexity of incremental join logic. Don't move past this pattern unless rebuilds are clearly hurting cost or latency.
If the joined result is large and rebuilds get costly, define the target as a materialized view over your Delta sources. Let Databricks incrementally refresh it.
Do this:
CREATE OR REPLACE MATERIALIZED VIEW analytics.target_joined_mv AS
SELECT ...
FROM source_a a
FULL OUTER JOIN source_b b ON ...
FULL OUTER JOIN source_c c ON ...
;
ALTER TABLE source_a SET TBLPROPERTIES (
delta.enableDeletionVectors = true,
delta.enableRowTracking = true,
delta.enableChangeDataFeed = true
);
Serverless pipelines can incrementally refresh many queries, including ones with LEFT, RIGHT, and FULL OUTER JOIN, but only when the documented conditions are met. Read the conditions before you commit (1).
Use this when your source systems expose inserts, updates, and deletes, and you need to propagate just the changes.
Do this:
Don't reach for this pattern unless you actually need it. It scales to large, frequently changing tables and preserves history (SCD2) when you want it, but the engineering and operational cost is real.
Take these two courses, in order:
They walk through these exact patterns and complement the docs.
(1) Incremental refresh for materialized views: https://docs.databricks.com/aws/en/optimizations/incremental-refresh (2) Change data capture and snapshots: https://docs.databricks.com/aws/en/ldp/what-is-change-data-capture (3) The AUTO CDC APIs: https://docs.databricks.com/aws/en/ldp/cdc (4) Delta Change Data Feed: https://docs.databricks.com/aws/en/delta/delta-change-data-feed (5) CDC with Delta Live Tables: https://www.databricks.com/blog/2022/04/25/simplifying-change-data-capture-with-databricks-delta-liv...
Hope this helps.
Cheers, Louis.
3 weeks ago
Thank you for such informtive and helpfull response!