cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks optimization for query perfomance and pipeline run

sai_sakhamuri
Databricks Partner

I am currently working on optimizing several Spark pipelines and wanted to gather community insights on advanced performance tuning. Typically, my workflow for traditional SQL optimization involves a deep dive into the execution plan to identify bottlenecks and develop a targeted optimization strategy. Within Spark, I’ve focused heavily on pipeline optimization, specifically addressing join performance through broadcast joins and mitigating shuffle delays.

I am interested in moving beyond these standard techniques. Specifically, I’ve been hearing from others developers about the Spark Catalyst Optimizer and the Tungsten Engine. I’m curious if anyone has practical experience or "under-the-hood" tips on how to better leverage these engines—or if there are other architectural optimizations (like Adaptive Query Execution) that you’ve found effective for high-volume ETL.

1 ACCEPTED SOLUTION

Accepted Solutions

lingareddy_Alva
Esteemed Contributor

Hi @sai_sakhamuri 

You're clearly past the basics. Let me give you a practitioner-level breakdown of each layer you mentioned, plus a few things that often get overlooked.


Spark Catalyst Optimizer — Working With the Rules Engine
Catalyst operates in four phases: Analysis → Logical Optimization → Physical Planning → Code Generation. Most developers only think about the physical plan, but the biggest leverage is earlier.
Practical tips:
Predicate pushdown is not always automatic. When reading from JDBC sources or custom data source V2 connectors, Catalyst can't always push filters down. Explicitly filter before any joins/aggregations and verify with df.explain("extended") that predicates appear in the PushedFilters section.
Column pruning breaks with select *. If you use SELECT * anywhere upstream, Catalyst cannot prune unused columns from file reads. Always project only what you need, as early as possible.
Avoid UDFs when a native expression exists. Catalyst cannot optimize inside a UDF — it's a black box. Even simple UDFs defeat predicate pushdown on the columns they touch. Prefer expr(), when/otherwise, and functions.* equivalents.
Use explain("cost") (Spark 3+) to see the row count and size estimates Catalyst is working with. If its estimates are wildly off, your statistics are stale — ANALYZE TABLE ... COMPUTE STATISTICS FOR COLUMNS fixes this for Hive metastore tables.

Tungsten Engine — The Binary/Codegen Layer
Tungsten is responsible for off-heap memory management and whole-stage code generation (WSCG). It compiles operator chains into a single tight Java bytecode loop, eliminating virtual dispatch and intermediate object allocation.

Practical tips:
Watch for WSCG breaking. Some operators like Window, certain aggregate types, and very wide schemas (200+ columns) fall back to the interpreted path. explain() will show *(1) Project for WSCG-compiled stages and a plain Project for fallback. If you see a lot of unstarred operators on a hot path, that's a regression worth investigating.
Control off-heap memory explicitly. Set spark.memory.offHeap.enabled=true and spark.memory.offHeap.size if your executors are under GC pressure. Tungsten's binary format avoids GC entirely for intermediate data, which matters enormously for large sort/agg operations.
Avoid Kryo serialization for columnar operations. Tungsten's binary row format and Kryo don't mix well. Keep Kryo for RDD-based legacy code; for DataFrames/Datasets, let Tungsten manage serialization.

Adaptive Query Execution (AQE) — Your Best Friend for ETL:
AQE (enabled by default in Spark 3.2+) re-optimizes the physical plan at runtime using actual partition statistics after each shuffle stage. For high-volume ETL, this is often the single highest-ROI lever.

The three core features and how to tune them:
spark.sql.adaptive.coalescePartitions.enabled
spark.sql.adaptive.join.enabled
spark.sql.adaptive.skewJoin.enabled

One often-missed AQE gotcha: AQE only activates after a shuffle boundary. If your pipeline has no shuffles (e.g., it's a pure filter + project on a partitioned table), AQE does nothing. Structure your stages to give AQE observable shuffle stages to work with.


Other Architectural Levers Worth Exploring:
Dynamic Partition Pruning (DPP): When joining a large fact table to a small dimension table, Spark 3 can push the dimension's filter as a subquery into the fact table scan — eliminating entire file partitions before they're read. Requires spark.sql.optimizer.dynamicPartitionPruning.enabled=true and a broadcast-eligible dimension side.
Bucketing for repeat joins: If you join the same two large tables repeatedly (e.g., in a daily ETL), bucketing on the join key eliminates the shuffle entirely on every subsequent run. The upfront write cost pays off fast.
Z-ordering / liquid clustering (Delta Lake): If you're on Delta, Z-ordering co-locates related data physically, which compounds with DPP and file skipping. Liquid Clustering (Delta 3.1+) makes this maintenance-free.
Scan coalescing via file layout: Thousands of small files kill scan performance before any optimizer can help. A periodic compaction job (or OPTIMIZE on Delta) is often worth more than any query-level tuning.

The general diagnostic workflow I'd suggest: run explain("formatted") on your slowest stages, look for broken WSCG, check AQE's runtime plan diff (Spark UI → SQL tab → "AQE plan"), then verify statistics freshness. Most ETL regressions trace back to one of those three things.

 

 

 

 

 

LR

View solution in original post

1 REPLY 1

lingareddy_Alva
Esteemed Contributor

Hi @sai_sakhamuri 

You're clearly past the basics. Let me give you a practitioner-level breakdown of each layer you mentioned, plus a few things that often get overlooked.


Spark Catalyst Optimizer — Working With the Rules Engine
Catalyst operates in four phases: Analysis → Logical Optimization → Physical Planning → Code Generation. Most developers only think about the physical plan, but the biggest leverage is earlier.
Practical tips:
Predicate pushdown is not always automatic. When reading from JDBC sources or custom data source V2 connectors, Catalyst can't always push filters down. Explicitly filter before any joins/aggregations and verify with df.explain("extended") that predicates appear in the PushedFilters section.
Column pruning breaks with select *. If you use SELECT * anywhere upstream, Catalyst cannot prune unused columns from file reads. Always project only what you need, as early as possible.
Avoid UDFs when a native expression exists. Catalyst cannot optimize inside a UDF — it's a black box. Even simple UDFs defeat predicate pushdown on the columns they touch. Prefer expr(), when/otherwise, and functions.* equivalents.
Use explain("cost") (Spark 3+) to see the row count and size estimates Catalyst is working with. If its estimates are wildly off, your statistics are stale — ANALYZE TABLE ... COMPUTE STATISTICS FOR COLUMNS fixes this for Hive metastore tables.

Tungsten Engine — The Binary/Codegen Layer
Tungsten is responsible for off-heap memory management and whole-stage code generation (WSCG). It compiles operator chains into a single tight Java bytecode loop, eliminating virtual dispatch and intermediate object allocation.

Practical tips:
Watch for WSCG breaking. Some operators like Window, certain aggregate types, and very wide schemas (200+ columns) fall back to the interpreted path. explain() will show *(1) Project for WSCG-compiled stages and a plain Project for fallback. If you see a lot of unstarred operators on a hot path, that's a regression worth investigating.
Control off-heap memory explicitly. Set spark.memory.offHeap.enabled=true and spark.memory.offHeap.size if your executors are under GC pressure. Tungsten's binary format avoids GC entirely for intermediate data, which matters enormously for large sort/agg operations.
Avoid Kryo serialization for columnar operations. Tungsten's binary row format and Kryo don't mix well. Keep Kryo for RDD-based legacy code; for DataFrames/Datasets, let Tungsten manage serialization.

Adaptive Query Execution (AQE) — Your Best Friend for ETL:
AQE (enabled by default in Spark 3.2+) re-optimizes the physical plan at runtime using actual partition statistics after each shuffle stage. For high-volume ETL, this is often the single highest-ROI lever.

The three core features and how to tune them:
spark.sql.adaptive.coalescePartitions.enabled
spark.sql.adaptive.join.enabled
spark.sql.adaptive.skewJoin.enabled

One often-missed AQE gotcha: AQE only activates after a shuffle boundary. If your pipeline has no shuffles (e.g., it's a pure filter + project on a partitioned table), AQE does nothing. Structure your stages to give AQE observable shuffle stages to work with.


Other Architectural Levers Worth Exploring:
Dynamic Partition Pruning (DPP): When joining a large fact table to a small dimension table, Spark 3 can push the dimension's filter as a subquery into the fact table scan — eliminating entire file partitions before they're read. Requires spark.sql.optimizer.dynamicPartitionPruning.enabled=true and a broadcast-eligible dimension side.
Bucketing for repeat joins: If you join the same two large tables repeatedly (e.g., in a daily ETL), bucketing on the join key eliminates the shuffle entirely on every subsequent run. The upfront write cost pays off fast.
Z-ordering / liquid clustering (Delta Lake): If you're on Delta, Z-ordering co-locates related data physically, which compounds with DPP and file skipping. Liquid Clustering (Delta 3.1+) makes this maintenance-free.
Scan coalescing via file layout: Thousands of small files kill scan performance before any optimizer can help. A periodic compaction job (or OPTIMIZE on Delta) is often worth more than any query-level tuning.

The general diagnostic workflow I'd suggest: run explain("formatted") on your slowest stages, look for broken WSCG, check AQE's runtime plan diff (Spark UI → SQL tab → "AQE plan"), then verify statistics freshness. Most ETL regressions trace back to one of those three things.

 

 

 

 

 

LR