cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Reading Spark UI: A Repeatable Guide to Finding Performance Bottlenecks

Ashwin_DSA
Databricks Employee
Databricks Employee

A question came up in the community recently that I thought deserved more than a short answer. The question was around how to build a reliable investigation sequence for slow Spark jobs, specifically when symptoms overlap. A long-running stage with high spill and a few slow tasks could be data skew, insufficient executor memory, too few partitions, or an inefficient join strategy. The Spark UI has all the information you need to tell them apart, but only if you know where to look and in what order.

I put together a lab notebook with three intentionally broken jobs, one per bottleneck type, ran them on a Databricks cluster with Photon disabled to expose the classic Spark signatures, and captured every screenshot from a real run. This post walks through what each bottleneck looks like in the UI, the fix, and how to confirm the fix actually worked. The goal is a sequence you can apply directly the next time a stage takes longer than it should.

Key Takeaways
  • Start in the Stages tab. Find the slowest stage, then ask three questions in order before touching any configuration.
  • Data skew, memory pressure, and underparallelism each produce a distinct Spark UI signature. The Max/Median duration ratio and the spill distribution are the fastest discriminators.
  • A single faster run is not validation. The underlying metric (GC time, spill, task ratio) must move, not just wall-clock time.
  • Enable AQE before doing any manual tuning. It resolves a large fraction of shuffle partition and broadcast join problems automatically.

The investigation sequence

Before clicking anything, open the Stages tab and sort by Duration descending. Pick the longest stage. Everything else is noise until you understand that one stage.

Inside the stage, you need three numbers before drawing any conclusions:

  • Median task duration
  • Max task duration
  • Median vs Max shuffle read size per task

The ratio of Max to Median is your first discriminator. From there, three questions applied in order will identify the primary bottleneck in the large majority of cases.

Question 1: Is Max Duration more than 5x Median? If yes, check whether shuffle read bytes are also skewed. Both conditions together indicate data skew.

Question 2: Does spill appear on most tasks (not just outliers)? Is GC time above 10% in the Executors tab? If yes, it is memory pressure.

Question 3: Is task count well below 2x your executor core count? If yes, it is underparallelism.

If none of those fit, open the SQL / DataFrame tab and look at the physical plan. Missing predicate pushdown, unexpected cross joins, or a sort-merge join where broadcast would work are the next places to look.


Scenario 1: Data skew

The lab job joins a 20 million-row fact table where 70% of rows share the same join key (key=1) against a 20-row reference table. Broadcast is explicitly disabled to force a sort-merge join, which means all 20 million rows must be shuffled and sorted by join key before the merge. The partition holding key=1 receives 14 million rows. Every other partition receives a handful.

from pyspark.sql import functions as F

spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")  # force sort-merge join

skew_data = (
    spark.range(20_000_000)
    .withColumn("join_key",
        F.when(F.rand() < 0.70, F.lit(1))
         .when(F.rand() < 0.85, F.lit(2))
         .otherwise((F.rand() * 18 + 3).cast("int")))
    .withColumn("value", F.rand() * 1000)
    .withColumn("payload", F.expr("repeat(cast(rand() as string), 50)"))
)

ref_data = (
    spark.range(1, 21)
    .withColumnRenamed("id", "join_key")
    .withColumn("label", F.concat(F.lit("cat_"), F.col("join_key").cast("string")))
)

result = (
    skew_data.join(ref_data, "join_key", "inner")
    .groupBy("join_key", "label")
    .agg(F.sum("value").alias("total"), F.count("*").alias("cnt"))
)
result.write.format("noop").mode("overwrite").save()

Task Metrics: the 24x ratio

Ashwin_DSA_0-1782413970227.png

 

Stage 2 Task Metrics. Median duration 38ms, Max 0.9s: a 24x ratio. Shuffle Read is 0 bytes on 199 of 200 partitions; one partition holds the hot key.

The Task Metrics summary table is the most important screen in the Spark UI for diagnosing skew. Two rows matter here:

  • Duration: Median 38ms, Max 0.9s. A 24x ratio. The 75th percentile sits at 53ms, meaning the outlier is not just slightly above average. It is categorically different from the rest of the distribution.
  • Shuffle Read Size: Median 0 bytes / 0 records across 199 of 200 tasks. The Max partition receives all the data. Most tasks have nothing to process.

This is the defining skew signature: a bimodal distribution where the vast majority of tasks finish in milliseconds and one task runs for orders of magnitude longer.

Event Timeline: the visual tell

Ashwin_DSA_1-1782413989929.png
Event Timeline for Stage 2. A handful of long green (Executor Computing Time) bars at the top, followed by 190+ short bars. This bimodal shape is the skew signature.

The Event Timeline converts the numeric ratio into something immediately visual. Long green bars indicate CPU time spent processing the hot key partition. The colour here matters: green is Executor Computing Time, confirming the slow tasks are CPU-bound on data processing, not waiting on I/O or network. Memory pressure and underparallelism do not produce this bimodal shape, which makes it the fastest visual discriminator between the three bottleneck types.

DAG Visualization: confirming the join strategy

Ashwin_DSA_2-1782414031747.pngDAG for Stage 2. Two Exchange (shuffle) nodes feed into two Sort operations, which merge via SortMergeJoin inside WholeStageCodegen. Both sides were fully shuffled and sorted by join key, making skew on the join key directly visible as a task outlier.

The DAG confirms the mechanism. Two Exchange nodes (one per join side) followed by Sort operations and a SortMergeJoin. This is the join strategy that makes skew dangerous: every row for key=1 lands on the same partition, and one task must process all of it alone.


Scenario 1b: Broadcast join fix

The reference table has 20 rows. It fits comfortably in executor memory. Enabling broadcast replicates it to every executor, eliminating the need to shuffle the join key entirely. The SortMergeJoin and its two Exchange stages disappear from the plan.

# Re-enable broadcast: small table replicated to every executor, no shuffle on join key
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(10 * 1024 * 1024))

result = (
    skew_data.join(F.broadcast(ref_data), "join_key", "inner")
    .groupBy("join_key", "label")
    .agg(F.sum("value").alias("total"), F.count("*").alias("cnt"))
)
result.write.format("noop").mode("overwrite").save()

Jobs comparison: the numbers

Ashwin_DSA_3-1782414084869.pngJobs tab. Scenario 1 (no broadcast): 18s, 4 stages, 408 tasks. Scenario 1b (broadcast): 5s, 2 stages, 204 tasks. Half the stages, half the tasks, 3.6x faster.

The improvement is not incremental. The fix collapses two of the four stages entirely. In production, where a skewed sort-merge join might anchor a 30-minute stage, this is the difference between a job completing before business hours and one that misses its SLA.

Stages after the fix

Ashwin_DSA_4-1782414108177.pngScenario 1b Stages. Two stages: a 4-task scan and a 200-task aggregation shuffle. The 200-task shuffle join stage that produced the 24x task ratio is absent.

The fix did not make the slow task faster. It eliminated the stage that contained the slow task. That is a fundamentally different kind of improvement and it is the one to aim for with skew. If the data allows it, removing the shuffle is better than optimizing within it.

When broadcast is not an option: if the smaller side of the join is too large to broadcast (typically above 500MB to 1GB depending on executor memory), the next tool is salting. Add a random suffix to the hot key before the join to distribute its rows across multiple partitions, then strip the suffix in a second pass. AQE's skew join optimization (spark.sql.adaptive.skewJoin.enabled) automates a version of this when the skewed partition exceeds the configured threshold.

Scenario 2: Memory pressure

The lab job processes 4 million wide rows, each approximately 400 bytes of string data across five columns, shuffled into only 20 partitions. Each task receives roughly 8 MB of data. A collect_list aggregation forces each task to hold the full list of strings in heap before writing the result. The combination of large per-task data and an in-memory accumulator produces GC pressure across all tasks.

spark.conf.set("spark.sql.shuffle.partitions", "20")  # intentionally too few

wide_data = (
    spark.range(4_000_000)
    .withColumn("group_key", (F.col("id") % 20).cast("int"))
    .withColumn("col_a", F.expr("repeat(cast(rand() as string), 80)"))
    .withColumn("col_b", F.expr("repeat(cast(rand() as string), 80)"))
    .withColumn("col_c", F.expr("repeat(cast(rand() as string), 80)"))
    .withColumn("col_d", F.expr("repeat(cast(rand() as string), 80)"))
    .withColumn("col_e", F.expr("repeat(cast(rand() as string), 80)"))
    .withColumn("metric", F.rand() * 1000)
)

result = (
    wide_data
    .repartition(20, "group_key")
    .groupBy("group_key")
    .agg(
        F.sum("metric").alias("total"),
        F.collect_list("col_a").alias("all_a"),  # forces large in-memory buffer
        F.count("*").alias("cnt")
    )
)
result.write.format("noop").mode("overwrite").save()

Stages: large shuffle volume, few tasks

Ashwin_DSA_5-1782414148744.pngScenario 2 Stages. Stage 8: 20 tasks, 173.5 MiB shuffle read, 12 seconds. Compare to Scenario 1's Stage 2: 200 tasks, 7 KiB shuffle read, 9 seconds. The absolute shuffle volume flags the problem before you even click in.

The 173.5 MiB total shuffle read across 20 tasks means each task processes approximately 8.7 MiB of serialized data, before accounting for the deserialized in-memory representation which is larger. This is where the memory constraint originates.

Task Metrics: GC time is the signal

Ashwin_DSA_6-1782414171745.pngStage 8 Task Metrics. Duration: Median 2s, Max 9s (4.5x ratio, much tighter than skew's 24x). GC Time: Max 4s on a 9s task, 44% of task time in garbage collection. Shuffle Read Median is 8.3 MiB / 200k records per task, confirming large uniform per-task data volumes.

Two things separate this from skew:

  • Duration ratio: Max/Median is 4.5x, compared to 24x in Scenario 1. All the heavy tasks are slow together, not one outlier dragging the stage.
  • GC Time: Max 4 seconds on a 9-second task. That is 44% of task time in garbage collection. The JVM is constantly reclaiming the large string arrays built by collect_list. This is the clearest memory pressure indicator available in the Spark UI.

Note there is no Spill row in this screenshot. On this cluster with 10.7 GiB executor memory, the data stays in heap but causes severe GC pressure. In production with smaller executor memory relative to partition size, the same root cause produces disk spill instead. The fix is identical in both cases: reduce per-task data volume.

Executors tab note: on a multi-executor cluster, check the Executors tab. The GC Time column shows cumulative GC per executor. Uniformly high GC across all executors points to a partition sizing problem. GC concentrated on specific executors points to uneven data distribution.

Scenario 2b: Partition fix

The fix raises shuffle partitions from 20 to 400 and removes the collect_list aggregation. Each task now receives a much smaller data chunk, and no large in-memory buffers are built.

spark.conf.set("spark.sql.shuffle.partitions", "400")

result = (
    wide_data
    .repartition(400, "group_key")
    .groupBy("group_key")
    .agg(F.sum("metric").alias("total"), F.count("*").alias("cnt"))
    # collect_list removed: no large in-memory accumulator
)
result.write.format("noop").mode("overwrite").save()
Ashwin_DSA_11-1782414614629.pngScenario 2b Stages. Stage 10: 400 tasks, 39.0 MiB shuffle read (down from 173.5 MiB), 6 seconds (down from 12 seconds).
Ashwin_DSA_12-1782414635602.pngScenario 2b Task Metrics. GC Time: 0ms across all percentiles including Max. Duration: Median 10ms, Max 0.2s. The root cause is eliminated, not just reduced.
Metric 20 partitions 400 partitions
Stage duration 12s 6s
Median task duration 2s 10ms
Max task duration 9s 0.2s
Max GC Time 4s (44% of task) 0ms
Total shuffle read 173.5 MiB 39.0 MiB

GC time dropping to zero is the validation signal. Not the wall-clock improvement (which is real), but the fact that the JVM has nothing to collect. The objects that were triggering pressure no longer exist at that size in heap.


Scenario 3: Underparallelism

The lab job runs a 3 million-row aggregation after repartitioning to 4 partitions. There is no skew and no spill. The cluster is simply not given enough tasks to use its available cores.

spark.conf.set("spark.sql.shuffle.partitions", "4")  # intentionally too low

under_data = (
    spark.range(3_000_000)
    .withColumn("category", (F.rand() * 100).cast("int").cast("string"))
    .withColumn("value", F.rand() * 500)
)

result = (
    under_data
    .repartition(4)  # 4 tasks regardless of cluster size
    .groupBy("category")
    .agg(F.avg("value").alias("avg_val"), F.count("*").alias("cnt"))
    .orderBy("cnt", ascending=False)
)
result.write.format("noop").mode("overwrite").save()

# Fix: raise partitions to match data volume
spark.conf.set("spark.sql.shuffle.partitions", "100")
result = (
    under_data.repartition(100)
    .groupBy("category")
    .agg(F.avg("value").alias("avg_val"), F.count("*").alias("cnt"))
    .orderBy("cnt", ascending=False)
)
result.write.format("noop").mode("overwrite").save()
Ashwin_DSA_13-1782414658971.pngScenario 3 Stages. Three stages, all with 4/4 tasks. On a production cluster with 32 or 64 cores, the same configuration leaves the vast majority of the cluster idle throughout the job.

The underparallelism tell in the Stages tab is the task count. If every stage runs a small, fixed number of tasks regardless of data volume, the shuffle partition count is almost certainly the constraint. Check spark.sql.shuffle.partitions and compare it to 2x your executor core count as a starting floor.

Unlike skew and memory pressure, underparallelism produces no spill, no GC pressure, and a tight Max/Median ratio. All tasks run cleanly. The job simply uses a fraction of available parallelism, so it takes proportionally longer than it needs to.

Ashwin_DSA_14-1782414675999.pngScenario 3b Stages. 100 tasks in the aggregation stage, 94 in the sort stage. On a production cluster with 32 cores, the 4-task version wastes 28 cores per wave. The 100-task version keeps the cluster busy and completes in roughly 1/8 the wall-clock time.

Decision map and practical thresholds

Symptom Root cause First action
Max/Median > 5x, shuffle read skewed Data skew Broadcast join if small side fits; salting if not
Spill on most tasks or GC > 10% Memory pressure More partitions before adding executor memory
Task count < 2x executor cores Underparallelism Raise spark.sql.shuffle.partitions or add repartition()
None of the above Plan problem SQL tab: check for cross joins, missing predicate pushdown, wrong join strategy

Thresholds experienced teams use

  • Skew threshold: any task reading more than 3x the median shuffle bytes warrants investigation. AQE's skew join optimization fires at 256MB by default. Lowering it to 64MB catches problems earlier: spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=67108864
  • Spill threshold: any disk spill is worth addressing. Even 100MB of spill indicates the executor memory-to-partition-size ratio is wrong.
  • GC threshold: GC time above 10% of task time in the Executors tab means memory is a primary constraint. Above 20%, it is the dominant cause of slowness.
  • Broadcast threshold: spark.sql.autoBroadcastJoinThreshold defaults to 10MB. In practice, tables up to 500MB to 1GB broadcast safely on modern clusters if executor memory is adequate. Check the physical plan in the SQL tab to confirm which join strategy Spark chose.
  • Partition sizing: target 128 to 256MB of input data per partition for shuffle-heavy stages. For compute-intensive stages with complex UDFs, target 64MB to avoid GC pressure from large in-memory objects.
Enable AQE first: set spark.sql.adaptive.enabled=true before doing any manual tuning. AQE resolves the majority of shuffle partition count and broadcast join problems automatically. Use the investigation sequence above for what AQE does not fix: severe skew on low-cardinality keys, memory pressure from large individual task sizes, and first-stage underparallelism before AQE has seen statistics.

Validating the fix

A single faster run is not enough. Here is what to check after applying a fix:

  1. The metric moved, not just the clock. If you salted for skew, confirm the Max/Median ratio dropped below 2x. If you increased partitions for memory pressure, confirm GC time dropped to zero or near zero, not just reduced. Wall-clock improvement with no change in the underlying metric means something else is limiting performance.
  2. Spill is zero, not reduced. Spill dropping from 10GB to 2GB means you improved the memory-to-partition ratio but did not fully resolve it. The correct partition size produces zero spill.
  3. Run on a cold cache. The second run of a job often benefits from cached shuffle files from the first run. Force a fresh run by clearing cache or changing the input path, otherwise the improvement may be partially artificial.
  4. Run at full production data volume. Skew and memory pressure are data-volume-dependent. A fix that works on a 10% sample frequently fails at full volume because the skewed key's frequency is nonlinear and per-partition data volume changes.
  5. Check across a time window. For recurring jobs, pull 7 days of run history after applying the fix and confirm duration variance dropped. A job that averages fast but spikes 3x on weekends still has a skew or partition imbalance problem the average obscures.

Hope this is useful for diagnosing your slow stages. If you have additional techniques or thresholds that work well in your environment, please share them in the comments. These patterns improve with more data points.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***
0 REPLIES 0