<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Expensive join in spark declarative pipeline against a Lakeflow conect table in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/152151#M53769</link>
    <description>&lt;P&gt;Hi Leopold,&lt;/P&gt;
&lt;P&gt;Broadcast join is 100% the first thing to try. This effectively copies the the smaller table to each node that the larger table is being processes on and avoids multiple shuffles. Here is an example of how this is done in SQL.&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT
  o.*,
  /*+ BROADCAST(p) */  -- broadcast the small dimension side
  p.product_name,
  p.category
FROM orders o
JOIN products p
  ON o.product_id = p.product_id;&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp; Thanks,&lt;BR /&gt;&lt;BR /&gt;Emma&lt;/P&gt;</description>
    <pubDate>Thu, 26 Mar 2026 11:08:42 GMT</pubDate>
    <dc:creator>emma_s</dc:creator>
    <dc:date>2026-03-26T11:08:42Z</dc:date>
    <item>
      <title>Expensive join in spark declarative pipeline against a Lakeflow conect table</title>
      <link>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/149602#M53128</link>
      <description>&lt;P&gt;I'm trying to resolve one issue and I would like to get some expert opinion on what the right solution actually is.&lt;BR /&gt;I have an SQL server with CDC with tables big_source (containing hundreds of millions of rows) and small_source containing small amount of rows.&lt;BR /&gt;I have set up Lakeflow connect to move these tables into our bronze layer - delta table with SCD1. After that I created an SDP pipeline which should&lt;BR /&gt;move data from bronze to silver into single table Target, which is essentially a join of the two.&lt;BR /&gt;My goal is also to propagate deletes and updates from sources to Target. When change occurs in the big_source, it is fine as I'm joining a small&lt;BR /&gt;stream of changes with table small_source. However if change occurs in in small_source table, I'm joining tiny stream of changes with huge big_source table and&lt;BR /&gt;I see big data movements (rows processed, bytes read) during processing tiny changes to the small_source. What is the best way how to optimize such operation to minimize data reads of the big_source (I tried to enable clustering on Lakeflow table - but it seems this is not supported, I created a new dp.table which merely mirrors the big_source). Thanks for any hint.&lt;/P&gt;</description>
      <pubDate>Mon, 02 Mar 2026 08:46:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/149602#M53128</guid>
      <dc:creator>leopold_cudzik</dc:creator>
      <dc:date>2026-03-02T08:46:51Z</dc:date>
    </item>
    <item>
      <title>Re: Expensive join in spark declarative pipeline against a Lakeflow conect table</title>
      <link>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/149605#M53129</link>
      <description>&lt;P&gt;Hi,&amp;nbsp;&lt;BR /&gt;You can read about Bloom Filter. It can drastically decrease the I/O and also have small footprint.&lt;BR /&gt;It tells the spark, that the join id is not definitely in a partition (No False Negative , 100% correct)&lt;BR /&gt;Bloom filters only work for equality (=) joins.&lt;BR /&gt;&lt;BR /&gt;I would recommend trying Liquid Clustering and Bloom filter combination.&lt;BR /&gt;Let me know if this improves your big data movement issues.&lt;/P&gt;</description>
      <pubDate>Mon, 02 Mar 2026 09:06:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/149605#M53129</guid>
      <dc:creator>soloengine</dc:creator>
      <dc:date>2026-03-02T09:06:45Z</dc:date>
    </item>
    <item>
      <title>Re: Expensive join in spark declarative pipeline against a Lakeflow conect table</title>
      <link>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/149614#M53132</link>
      <description>&lt;P&gt;Should I be worried by this statement in docs:&lt;BR /&gt;Important:Azure Databricks doesn't recommend using Bloom filter indexes for most workloads.&lt;BR /&gt;&lt;A href="https://learn.microsoft.com/en-us/azure/databricks/optimizations/bloom-filters" target="_blank"&gt;https://learn.microsoft.com/en-us/azure/databricks/optimizations/bloom-filters&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 02 Mar 2026 11:44:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/149614#M53132</guid>
      <dc:creator>leopold_cudzik</dc:creator>
      <dc:date>2026-03-02T11:44:26Z</dc:date>
    </item>
    <item>
      <title>Re: Expensive join in spark declarative pipeline against a Lakeflow conect table</title>
      <link>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/150087#M53234</link>
      <description>&lt;P&gt;Hi &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/217613"&gt;@leopold_cudzik&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;Great question -- this is a classic challenge with stream-static joins in Spark Declarative Pipelines, and there are several concrete things you can do. Let me break it down.&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;UNDERSTANDING THE ROOT CAUSE&lt;/P&gt;
&lt;P&gt;The core issue is how streaming table joins work. When you define a streaming table that joins two sources, one side is read as a stream (the changed rows) and the other side is read as a snapshot. Per the Databricks documentation:&lt;/P&gt;
&lt;P&gt;"Joins in streaming tables do not recompute when dimensions change."&lt;/P&gt;
&lt;P&gt;This means when your small_source changes, the pipeline reads the small set of changed rows as a stream, but must scan the entire big_source as a static snapshot for the join. That is why you see huge data reads -- it is literally reading all of big_source to fulfill the join for just a few changed rows.&lt;/P&gt;
&lt;P&gt;Docs: &lt;A href="https://docs.databricks.com/en/ldp/streaming-tables.html" target="_blank"&gt;https://docs.databricks.com/en/ldp/streaming-tables.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;SOLUTION 1: USE A MATERIALIZED VIEW INSTEAD OF A STREAMING TABLE (RECOMMENDED)&lt;/P&gt;
&lt;P&gt;The most impactful change is to define your Target as a materialized view rather than a streaming table. Materialized views support incremental refresh -- the system can intelligently determine which portion of the output needs to be recomputed when either input changes.&lt;/P&gt;
&lt;P&gt;CREATE OR REFRESH MATERIALIZED VIEW silver.target AS&lt;BR /&gt;SELECT /*+ BROADCAST(s) */&lt;BR /&gt;b.*,&lt;BR /&gt;s.col1,&lt;BR /&gt;s.col2&lt;BR /&gt;FROM bronze.big_source b&lt;BR /&gt;JOIN bronze.small_source s ON b.key = s.key;&lt;/P&gt;
&lt;P&gt;Key benefits:&lt;BR /&gt;- Incremental refresh for joins is supported for INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins&lt;BR /&gt;- When small_source changes, the engine can identify only the affected rows and update them, rather than scanning all of big_source&lt;BR /&gt;- Both streaming tables and Delta tables are supported as inputs&lt;/P&gt;
&lt;P&gt;Important requirement: Incremental refresh for materialized views requires serverless compute and row-tracking enabled on the source tables. On classic compute, the MV does a full recompute each time.&lt;/P&gt;
&lt;P&gt;Docs: &lt;A href="https://docs.databricks.com/en/optimizations/incremental-refresh.html" target="_blank"&gt;https://docs.databricks.com/en/optimizations/incremental-refresh.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;SOLUTION 2: BROADCAST THE SMALL TABLE&lt;/P&gt;
&lt;P&gt;Regardless of streaming table or materialized view, you should broadcast the small table. This tells Spark to send the small table to every executor rather than shuffling both sides:&lt;/P&gt;
&lt;P&gt;SELECT /*+ BROADCAST(small_source) */&lt;BR /&gt;b.*, s.*&lt;BR /&gt;FROM STREAM(bronze.big_source) b&lt;BR /&gt;JOIN bronze.small_source s ON b.key = s.key;&lt;/P&gt;
&lt;P&gt;This can significantly reduce shuffle I/O.&lt;/P&gt;
&lt;P&gt;Docs: &lt;A href="https://docs.databricks.com/en/ldp/best-practices.html" target="_blank"&gt;https://docs.databricks.com/en/ldp/best-practices.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;SOLUTION 3: ADD LIQUID CLUSTERING TO THE BIG SOURCE TABLE&lt;/P&gt;
&lt;P&gt;You mentioned that clustering on the Lakeflow Connect table directly is not supported. This is correct -- Lakeflow Connect manages those streaming tables. However, you can create an intermediate clustered copy:&lt;/P&gt;
&lt;P&gt;CREATE OR REFRESH STREAMING TABLE bronze.big_source_clustered&lt;BR /&gt;CLUSTER BY (join_key_column)&lt;BR /&gt;AS SELECT * FROM STREAM(bronze.big_source);&lt;/P&gt;
&lt;P&gt;Then use bronze.big_source_clustered in your join. This enables data skipping on the join key, so Spark can skip files that do not contain matching keys.&lt;/P&gt;
&lt;P&gt;Docs: &lt;A href="https://docs.databricks.com/en/delta/clustering.html" target="_blank"&gt;https://docs.databricks.com/en/delta/clustering.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;SOLUTION 4: LEVERAGE ADAPTIVE QUERY EXECUTION (AQE)&lt;/P&gt;
&lt;P&gt;AQE is enabled by default and automatically handles several join optimizations:&lt;/P&gt;
&lt;P&gt;- Auto broadcast conversion: If one side is small enough (default &amp;lt; 30MB), AQE converts to a broadcast hash join at runtime&lt;BR /&gt;- Skew join handling: AQE detects and splits skewed partitions&lt;BR /&gt;- Partition coalescing: Combines small post-shuffle partitions&lt;/P&gt;
&lt;P&gt;You can tune AQE with pipeline-level Spark configs:&lt;/P&gt;
&lt;P&gt;spark.databricks.adaptive.autoBroadcastJoinThreshold: "100MB"&lt;BR /&gt;spark.sql.adaptive.skewJoin.enabled: "true"&lt;BR /&gt;spark.sql.shuffle.partitions: "auto"&lt;/P&gt;
&lt;P&gt;Docs: &lt;A href="https://docs.databricks.com/en/optimizations/aqe.html" target="_blank"&gt;https://docs.databricks.com/en/optimizations/aqe.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;SOLUTION 5: TWO-STAGE PIPELINE ARCHITECTURE&lt;/P&gt;
&lt;P&gt;If none of the above fully resolve the issue, consider restructuring:&lt;/P&gt;
&lt;P&gt;1. Stage 1 (Streaming Tables): Ingest both sources via Lakeflow Connect (you already have this)&lt;BR /&gt;2. Stage 2 (Materialized View): Join the two bronze tables with CLUSTER BY on the join key and a BROADCAST hint&lt;BR /&gt;3. Stage 3 (Optional): Chain additional transformations downstream&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;ABOUT THE BLOOM FILTER SUGGESTION&lt;/P&gt;
&lt;P&gt;The previous reply suggested Bloom filters. While useful for some workloads, Databricks documentation notes that they are "not recommended for most workloads" and are primarily designed for selective equality filters on individual columns, not for optimizing joins. Liquid clustering and Predictive I/O are generally better alternatives for data skipping.&lt;/P&gt;
&lt;P&gt;Docs: &lt;A href="https://docs.databricks.com/en/optimizations/bloom-filters.html" target="_blank"&gt;https://docs.databricks.com/en/optimizations/bloom-filters.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;SUMMARY (IN PRIORITY ORDER)&lt;/P&gt;
&lt;P&gt;1. Switch Target to a materialized view with incremental refresh -- eliminates full scans when either side changes&lt;BR /&gt;2. Add BROADCAST hint on small_source -- eliminates shuffle for the small side&lt;BR /&gt;3. Add CLUSTER BY on join key via intermediate table -- enables data skipping on big_source reads&lt;BR /&gt;4. Tune AQE broadcast threshold -- helps runtime join strategy selection&lt;BR /&gt;5. Consider two-stage pipeline architecture -- clean separation of concerns&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;DOCUMENTATION REFERENCES&lt;/P&gt;
&lt;P&gt;- Streaming Tables in DLT: &lt;A href="https://docs.databricks.com/en/ldp/streaming-tables.html" target="_blank"&gt;https://docs.databricks.com/en/ldp/streaming-tables.html&lt;/A&gt;&lt;BR /&gt;- Materialized Views: &lt;A href="https://docs.databricks.com/en/ldp/materialized-views.html" target="_blank"&gt;https://docs.databricks.com/en/ldp/materialized-views.html&lt;/A&gt;&lt;BR /&gt;- Incremental Refresh: &lt;A href="https://docs.databricks.com/en/optimizations/incremental-refresh.html" target="_blank"&gt;https://docs.databricks.com/en/optimizations/incremental-refresh.html&lt;/A&gt;&lt;BR /&gt;- DLT Best Practices: &lt;A href="https://docs.databricks.com/en/ldp/best-practices.html" target="_blank"&gt;https://docs.databricks.com/en/ldp/best-practices.html&lt;/A&gt;&lt;BR /&gt;- Liquid Clustering: &lt;A href="https://docs.databricks.com/en/delta/clustering.html" target="_blank"&gt;https://docs.databricks.com/en/delta/clustering.html&lt;/A&gt;&lt;BR /&gt;- Adaptive Query Execution: &lt;A href="https://docs.databricks.com/en/optimizations/aqe.html" target="_blank"&gt;https://docs.databricks.com/en/optimizations/aqe.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;Hope this helps! Let me know if you have questions about any of these approaches.&lt;/P&gt;
&lt;P&gt;* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.&lt;/P&gt;</description>
      <pubDate>Sat, 07 Mar 2026 20:16:41 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/150087#M53234</guid>
      <dc:creator>SteveOstrowski</dc:creator>
      <dc:date>2026-03-07T20:16:41Z</dc:date>
    </item>
    <item>
      <title>Re: Expensive join in spark declarative pipeline against a Lakeflow conect table</title>
      <link>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/152151#M53769</link>
      <description>&lt;P&gt;Hi Leopold,&lt;/P&gt;
&lt;P&gt;Broadcast join is 100% the first thing to try. This effectively copies the the smaller table to each node that the larger table is being processes on and avoids multiple shuffles. Here is an example of how this is done in SQL.&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT
  o.*,
  /*+ BROADCAST(p) */  -- broadcast the small dimension side
  p.product_name,
  p.category
FROM orders o
JOIN products p
  ON o.product_id = p.product_id;&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp; Thanks,&lt;BR /&gt;&lt;BR /&gt;Emma&lt;/P&gt;</description>
      <pubDate>Thu, 26 Mar 2026 11:08:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/expensive-join-in-spark-declarative-pipeline-against-a-lakeflow/m-p/152151#M53769</guid>
      <dc:creator>emma_s</dc:creator>
      <dc:date>2026-03-26T11:08:42Z</dc:date>
    </item>
  </channel>
</rss>

