Hi @leopold_cudzik,
Great question -- this is a classic challenge with stream-static joins in Spark Declarative Pipelines, and there are several concrete things you can do. Let me break it down.
UNDERSTANDING THE ROOT CAUSE
The core issue is how streaming table joins work. When you define a streaming table that joins two sources, one side is read as a stream (the changed rows) and the other side is read as a snapshot. Per the Databricks documentation:
"Joins in streaming tables do not recompute when dimensions change."
This means when your small_source changes, the pipeline reads the small set of changed rows as a stream, but must scan the entire big_source as a static snapshot for the join. That is why you see huge data reads -- it is literally reading all of big_source to fulfill the join for just a few changed rows.
Docs: https://docs.databricks.com/en/ldp/streaming-tables.html
SOLUTION 1: USE A MATERIALIZED VIEW INSTEAD OF A STREAMING TABLE (RECOMMENDED)
The most impactful change is to define your Target as a materialized view rather than a streaming table. Materialized views support incremental refresh -- the system can intelligently determine which portion of the output needs to be recomputed when either input changes.
CREATE OR REFRESH MATERIALIZED VIEW silver.target AS
SELECT /*+ BROADCAST(s) */
b.*,
s.col1,
s.col2
FROM bronze.big_source b
JOIN bronze.small_source s ON b.key = s.key;
Key benefits:
- Incremental refresh for joins is supported for INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
- When small_source changes, the engine can identify only the affected rows and update them, rather than scanning all of big_source
- Both streaming tables and Delta tables are supported as inputs
Important requirement: Incremental refresh for materialized views requires serverless compute and row-tracking enabled on the source tables. On classic compute, the MV does a full recompute each time.
Docs: https://docs.databricks.com/en/optimizations/incremental-refresh.html
SOLUTION 2: BROADCAST THE SMALL TABLE
Regardless of streaming table or materialized view, you should broadcast the small table. This tells Spark to send the small table to every executor rather than shuffling both sides:
SELECT /*+ BROADCAST(small_source) */
b.*, s.*
FROM STREAM(bronze.big_source) b
JOIN bronze.small_source s ON b.key = s.key;
This can significantly reduce shuffle I/O.
Docs: https://docs.databricks.com/en/ldp/best-practices.html
SOLUTION 3: ADD LIQUID CLUSTERING TO THE BIG SOURCE TABLE
You mentioned that clustering on the Lakeflow Connect table directly is not supported. This is correct -- Lakeflow Connect manages those streaming tables. However, you can create an intermediate clustered copy:
CREATE OR REFRESH STREAMING TABLE bronze.big_source_clustered
CLUSTER BY (join_key_column)
AS SELECT * FROM STREAM(bronze.big_source);
Then use bronze.big_source_clustered in your join. This enables data skipping on the join key, so Spark can skip files that do not contain matching keys.
Docs: https://docs.databricks.com/en/delta/clustering.html
SOLUTION 4: LEVERAGE ADAPTIVE QUERY EXECUTION (AQE)
AQE is enabled by default and automatically handles several join optimizations:
- Auto broadcast conversion: If one side is small enough (default < 30MB), AQE converts to a broadcast hash join at runtime
- Skew join handling: AQE detects and splits skewed partitions
- Partition coalescing: Combines small post-shuffle partitions
You can tune AQE with pipeline-level Spark configs:
spark.databricks.adaptive.autoBroadcastJoinThreshold: "100MB"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.sql.shuffle.partitions: "auto"
Docs: https://docs.databricks.com/en/optimizations/aqe.html
SOLUTION 5: TWO-STAGE PIPELINE ARCHITECTURE
If none of the above fully resolve the issue, consider restructuring:
1. Stage 1 (Streaming Tables): Ingest both sources via Lakeflow Connect (you already have this)
2. Stage 2 (Materialized View): Join the two bronze tables with CLUSTER BY on the join key and a BROADCAST hint
3. Stage 3 (Optional): Chain additional transformations downstream
ABOUT THE BLOOM FILTER SUGGESTION
The previous reply suggested Bloom filters. While useful for some workloads, Databricks documentation notes that they are "not recommended for most workloads" and are primarily designed for selective equality filters on individual columns, not for optimizing joins. Liquid clustering and Predictive I/O are generally better alternatives for data skipping.
Docs: https://docs.databricks.com/en/optimizations/bloom-filters.html
SUMMARY (IN PRIORITY ORDER)
1. Switch Target to a materialized view with incremental refresh -- eliminates full scans when either side changes
2. Add BROADCAST hint on small_source -- eliminates shuffle for the small side
3. Add CLUSTER BY on join key via intermediate table -- enables data skipping on big_source reads
4. Tune AQE broadcast threshold -- helps runtime join strategy selection
5. Consider two-stage pipeline architecture -- clean separation of concerns
DOCUMENTATION REFERENCES
- Streaming Tables in DLT: https://docs.databricks.com/en/ldp/streaming-tables.html
- Materialized Views: https://docs.databricks.com/en/ldp/materialized-views.html
- Incremental Refresh: https://docs.databricks.com/en/optimizations/incremental-refresh.html
- DLT Best Practices: https://docs.databricks.com/en/ldp/best-practices.html
- Liquid Clustering: https://docs.databricks.com/en/delta/clustering.html
- Adaptive Query Execution: https://docs.databricks.com/en/optimizations/aqe.html
Hope this helps! Let me know if you have questions about any of these approaches.
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.