cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Expensive join in spark declarative pipeline against a Lakeflow conect table

leopold_cudzik
New Contributor II

I'm trying to resolve one issue and I would like to get some expert opinion on what the right solution actually is.
I have an SQL server with CDC with tables big_source (containing hundreds of millions of rows) and small_source containing small amount of rows.
I have set up Lakeflow connect to move these tables into our bronze layer - delta table with SCD1. After that I created an SDP pipeline which should
move data from bronze to silver into single table Target, which is essentially a join of the two.
My goal is also to propagate deletes and updates from sources to Target. When change occurs in the big_source, it is fine as I'm joining a small
stream of changes with table small_source. However if change occurs in in small_source table, I'm joining tiny stream of changes with huge big_source table and
I see big data movements (rows processed, bytes read) during processing tiny changes to the small_source. What is the best way how to optimize such operation to minimize data reads of the big_source (I tried to enable clustering on Lakeflow table - but it seems this is not supported, I created a new dp.table which merely mirrors the big_source). Thanks for any hint.

3 REPLIES 3

soloengine
New Contributor II

Hi, 
You can read about Bloom Filter. It can drastically decrease the I/O and also have small footprint.
It tells the spark, that the join id is not definitely in a partition (No False Negative , 100% correct)
Bloom filters only work for equality (=) joins.

I would recommend trying Liquid Clustering and Bloom filter combination.
Let me know if this improves your big data movement issues.

Should I be worried by this statement in docs:
Important:Azure Databricks doesn't recommend using Bloom filter indexes for most workloads.
https://learn.microsoft.com/en-us/azure/databricks/optimizations/bloom-filters

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @leopold_cudzik,

Great question -- this is a classic challenge with stream-static joins in Spark Declarative Pipelines, and there are several concrete things you can do. Let me break it down.


UNDERSTANDING THE ROOT CAUSE

The core issue is how streaming table joins work. When you define a streaming table that joins two sources, one side is read as a stream (the changed rows) and the other side is read as a snapshot. Per the Databricks documentation:

"Joins in streaming tables do not recompute when dimensions change."

This means when your small_source changes, the pipeline reads the small set of changed rows as a stream, but must scan the entire big_source as a static snapshot for the join. That is why you see huge data reads -- it is literally reading all of big_source to fulfill the join for just a few changed rows.

Docs: https://docs.databricks.com/en/ldp/streaming-tables.html


SOLUTION 1: USE A MATERIALIZED VIEW INSTEAD OF A STREAMING TABLE (RECOMMENDED)

The most impactful change is to define your Target as a materialized view rather than a streaming table. Materialized views support incremental refresh -- the system can intelligently determine which portion of the output needs to be recomputed when either input changes.

CREATE OR REFRESH MATERIALIZED VIEW silver.target AS
SELECT /*+ BROADCAST(s) */
b.*,
s.col1,
s.col2
FROM bronze.big_source b
JOIN bronze.small_source s ON b.key = s.key;

Key benefits:
- Incremental refresh for joins is supported for INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
- When small_source changes, the engine can identify only the affected rows and update them, rather than scanning all of big_source
- Both streaming tables and Delta tables are supported as inputs

Important requirement: Incremental refresh for materialized views requires serverless compute and row-tracking enabled on the source tables. On classic compute, the MV does a full recompute each time.

Docs: https://docs.databricks.com/en/optimizations/incremental-refresh.html


SOLUTION 2: BROADCAST THE SMALL TABLE

Regardless of streaming table or materialized view, you should broadcast the small table. This tells Spark to send the small table to every executor rather than shuffling both sides:

SELECT /*+ BROADCAST(small_source) */
b.*, s.*
FROM STREAM(bronze.big_source) b
JOIN bronze.small_source s ON b.key = s.key;

This can significantly reduce shuffle I/O.

Docs: https://docs.databricks.com/en/ldp/best-practices.html


SOLUTION 3: ADD LIQUID CLUSTERING TO THE BIG SOURCE TABLE

You mentioned that clustering on the Lakeflow Connect table directly is not supported. This is correct -- Lakeflow Connect manages those streaming tables. However, you can create an intermediate clustered copy:

CREATE OR REFRESH STREAMING TABLE bronze.big_source_clustered
CLUSTER BY (join_key_column)
AS SELECT * FROM STREAM(bronze.big_source);

Then use bronze.big_source_clustered in your join. This enables data skipping on the join key, so Spark can skip files that do not contain matching keys.

Docs: https://docs.databricks.com/en/delta/clustering.html


SOLUTION 4: LEVERAGE ADAPTIVE QUERY EXECUTION (AQE)

AQE is enabled by default and automatically handles several join optimizations:

- Auto broadcast conversion: If one side is small enough (default < 30MB), AQE converts to a broadcast hash join at runtime
- Skew join handling: AQE detects and splits skewed partitions
- Partition coalescing: Combines small post-shuffle partitions

You can tune AQE with pipeline-level Spark configs:

spark.databricks.adaptive.autoBroadcastJoinThreshold: "100MB"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.sql.shuffle.partitions: "auto"

Docs: https://docs.databricks.com/en/optimizations/aqe.html


SOLUTION 5: TWO-STAGE PIPELINE ARCHITECTURE

If none of the above fully resolve the issue, consider restructuring:

1. Stage 1 (Streaming Tables): Ingest both sources via Lakeflow Connect (you already have this)
2. Stage 2 (Materialized View): Join the two bronze tables with CLUSTER BY on the join key and a BROADCAST hint
3. Stage 3 (Optional): Chain additional transformations downstream


ABOUT THE BLOOM FILTER SUGGESTION

The previous reply suggested Bloom filters. While useful for some workloads, Databricks documentation notes that they are "not recommended for most workloads" and are primarily designed for selective equality filters on individual columns, not for optimizing joins. Liquid clustering and Predictive I/O are generally better alternatives for data skipping.

Docs: https://docs.databricks.com/en/optimizations/bloom-filters.html


SUMMARY (IN PRIORITY ORDER)

1. Switch Target to a materialized view with incremental refresh -- eliminates full scans when either side changes
2. Add BROADCAST hint on small_source -- eliminates shuffle for the small side
3. Add CLUSTER BY on join key via intermediate table -- enables data skipping on big_source reads
4. Tune AQE broadcast threshold -- helps runtime join strategy selection
5. Consider two-stage pipeline architecture -- clean separation of concerns


DOCUMENTATION REFERENCES

- Streaming Tables in DLT: https://docs.databricks.com/en/ldp/streaming-tables.html
- Materialized Views: https://docs.databricks.com/en/ldp/materialized-views.html
- Incremental Refresh: https://docs.databricks.com/en/optimizations/incremental-refresh.html
- DLT Best Practices: https://docs.databricks.com/en/ldp/best-practices.html
- Liquid Clustering: https://docs.databricks.com/en/delta/clustering.html
- Adaptive Query Execution: https://docs.databricks.com/en/optimizations/aqe.html

Hope this helps! Let me know if you have questions about any of these approaches.

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.