yesterday
A question came up in the community recently that I thought deserved more than a short answer. The question was around how to build a reliable investigation sequence for slow Spark jobs, specifically when symptoms overlap. A long-running stage with high spill and a few slow tasks could be data skew, insufficient executor memory, too few partitions, or an inefficient join strategy. The Spark UI has all the information you need to tell them apart, but only if you know where to look and in what order.
I put together a lab notebook with three intentionally broken jobs, one per bottleneck type, ran them on a Databricks cluster with Photon disabled to expose the classic Spark signatures, and captured every screenshot from a real run. This post walks through what each bottleneck looks like in the UI, the fix, and how to confirm the fix actually worked. The goal is a sequence you can apply directly the next time a stage takes longer than it should.
Before clicking anything, open the Stages tab and sort by Duration descending. Pick the longest stage. Everything else is noise until you understand that one stage.
Inside the stage, you need three numbers before drawing any conclusions:
The ratio of Max to Median is your first discriminator. From there, three questions applied in order will identify the primary bottleneck in the large majority of cases.
If none of those fit, open the SQL / DataFrame tab and look at the physical plan. Missing predicate pushdown, unexpected cross joins, or a sort-merge join where broadcast would work are the next places to look.
The lab job joins a 20 million-row fact table where 70% of rows share the same join key (key=1) against a 20-row reference table. Broadcast is explicitly disabled to force a sort-merge join, which means all 20 million rows must be shuffled and sorted by join key before the merge. The partition holding key=1 receives 14 million rows. Every other partition receives a handful.
from pyspark.sql import functions as F spark.conf.set("spark.sql.adaptive.enabled", "false") spark.conf.set("spark.sql.shuffle.partitions", "200") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") # force sort-merge join skew_data = ( spark.range(20_000_000) .withColumn("join_key", F.when(F.rand() < 0.70, F.lit(1)) .when(F.rand() < 0.85, F.lit(2)) .otherwise((F.rand() * 18 + 3).cast("int"))) .withColumn("value", F.rand() * 1000) .withColumn("payload", F.expr("repeat(cast(rand() as string), 50)")) ) ref_data = ( spark.range(1, 21) .withColumnRenamed("id", "join_key") .withColumn("label", F.concat(F.lit("cat_"), F.col("join_key").cast("string"))) ) result = ( skew_data.join(ref_data, "join_key", "inner") .groupBy("join_key", "label") .agg(F.sum("value").alias("total"), F.count("*").alias("cnt")) ) result.write.format("noop").mode("overwrite").save()
Stage 2 Task Metrics. Median duration 38ms, Max 0.9s: a 24x ratio. Shuffle Read is 0 bytes on 199 of 200 partitions; one partition holds the hot key.
The Task Metrics summary table is the most important screen in the Spark UI for diagnosing skew. Two rows matter here:
This is the defining skew signature: a bimodal distribution where the vast majority of tasks finish in milliseconds and one task runs for orders of magnitude longer.
The Event Timeline converts the numeric ratio into something immediately visual. Long green bars indicate CPU time spent processing the hot key partition. The colour here matters: green is Executor Computing Time, confirming the slow tasks are CPU-bound on data processing, not waiting on I/O or network. Memory pressure and underparallelism do not produce this bimodal shape, which makes it the fastest visual discriminator between the three bottleneck types.
The DAG confirms the mechanism. Two Exchange nodes (one per join side) followed by Sort operations and a SortMergeJoin. This is the join strategy that makes skew dangerous: every row for key=1 lands on the same partition, and one task must process all of it alone.
The reference table has 20 rows. It fits comfortably in executor memory. Enabling broadcast replicates it to every executor, eliminating the need to shuffle the join key entirely. The SortMergeJoin and its two Exchange stages disappear from the plan.
# Re-enable broadcast: small table replicated to every executor, no shuffle on join key spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(10 * 1024 * 1024)) result = ( skew_data.join(F.broadcast(ref_data), "join_key", "inner") .groupBy("join_key", "label") .agg(F.sum("value").alias("total"), F.count("*").alias("cnt")) ) result.write.format("noop").mode("overwrite").save()
The improvement is not incremental. The fix collapses two of the four stages entirely. In production, where a skewed sort-merge join might anchor a 30-minute stage, this is the difference between a job completing before business hours and one that misses its SLA.
The fix did not make the slow task faster. It eliminated the stage that contained the slow task. That is a fundamentally different kind of improvement and it is the one to aim for with skew. If the data allows it, removing the shuffle is better than optimizing within it.
spark.sql.adaptive.skewJoin.enabled) automates a version of this when the skewed partition exceeds the configured threshold.The lab job processes 4 million wide rows, each approximately 400 bytes of string data across five columns, shuffled into only 20 partitions. Each task receives roughly 8 MB of data. A collect_list aggregation forces each task to hold the full list of strings in heap before writing the result. The combination of large per-task data and an in-memory accumulator produces GC pressure across all tasks.
spark.conf.set("spark.sql.shuffle.partitions", "20") # intentionally too few wide_data = ( spark.range(4_000_000) .withColumn("group_key", (F.col("id") % 20).cast("int")) .withColumn("col_a", F.expr("repeat(cast(rand() as string), 80)")) .withColumn("col_b", F.expr("repeat(cast(rand() as string), 80)")) .withColumn("col_c", F.expr("repeat(cast(rand() as string), 80)")) .withColumn("col_d", F.expr("repeat(cast(rand() as string), 80)")) .withColumn("col_e", F.expr("repeat(cast(rand() as string), 80)")) .withColumn("metric", F.rand() * 1000) ) result = ( wide_data .repartition(20, "group_key") .groupBy("group_key") .agg( F.sum("metric").alias("total"), F.collect_list("col_a").alias("all_a"), # forces large in-memory buffer F.count("*").alias("cnt") ) ) result.write.format("noop").mode("overwrite").save()
The 173.5 MiB total shuffle read across 20 tasks means each task processes approximately 8.7 MiB of serialized data, before accounting for the deserialized in-memory representation which is larger. This is where the memory constraint originates.
Two things separate this from skew:
collect_list. This is the clearest memory pressure indicator available in the Spark UI.Note there is no Spill row in this screenshot. On this cluster with 10.7 GiB executor memory, the data stays in heap but causes severe GC pressure. In production with smaller executor memory relative to partition size, the same root cause produces disk spill instead. The fix is identical in both cases: reduce per-task data volume.
The fix raises shuffle partitions from 20 to 400 and removes the collect_list aggregation. Each task now receives a much smaller data chunk, and no large in-memory buffers are built.
spark.conf.set("spark.sql.shuffle.partitions", "400") result = ( wide_data .repartition(400, "group_key") .groupBy("group_key") .agg(F.sum("metric").alias("total"), F.count("*").alias("cnt")) # collect_list removed: no large in-memory accumulator ) result.write.format("noop").mode("overwrite").save()
| Metric | 20 partitions | 400 partitions |
|---|---|---|
| Stage duration | 12s | 6s |
| Median task duration | 2s | 10ms |
| Max task duration | 9s | 0.2s |
| Max GC Time | 4s (44% of task) | 0ms |
| Total shuffle read | 173.5 MiB | 39.0 MiB |
GC time dropping to zero is the validation signal. Not the wall-clock improvement (which is real), but the fact that the JVM has nothing to collect. The objects that were triggering pressure no longer exist at that size in heap.
The lab job runs a 3 million-row aggregation after repartitioning to 4 partitions. There is no skew and no spill. The cluster is simply not given enough tasks to use its available cores.
spark.conf.set("spark.sql.shuffle.partitions", "4") # intentionally too low under_data = ( spark.range(3_000_000) .withColumn("category", (F.rand() * 100).cast("int").cast("string")) .withColumn("value", F.rand() * 500) ) result = ( under_data .repartition(4) # 4 tasks regardless of cluster size .groupBy("category") .agg(F.avg("value").alias("avg_val"), F.count("*").alias("cnt")) .orderBy("cnt", ascending=False) ) result.write.format("noop").mode("overwrite").save() # Fix: raise partitions to match data volume spark.conf.set("spark.sql.shuffle.partitions", "100") result = ( under_data.repartition(100) .groupBy("category") .agg(F.avg("value").alias("avg_val"), F.count("*").alias("cnt")) .orderBy("cnt", ascending=False) ) result.write.format("noop").mode("overwrite").save()
The underparallelism tell in the Stages tab is the task count. If every stage runs a small, fixed number of tasks regardless of data volume, the shuffle partition count is almost certainly the constraint. Check spark.sql.shuffle.partitions and compare it to 2x your executor core count as a starting floor.
Unlike skew and memory pressure, underparallelism produces no spill, no GC pressure, and a tight Max/Median ratio. All tasks run cleanly. The job simply uses a fraction of available parallelism, so it takes proportionally longer than it needs to.
| Symptom | Root cause | First action |
|---|---|---|
| Max/Median > 5x, shuffle read skewed | Data skew | Broadcast join if small side fits; salting if not |
| Spill on most tasks or GC > 10% | Memory pressure | More partitions before adding executor memory |
| Task count < 2x executor cores | Underparallelism | Raise spark.sql.shuffle.partitions or add repartition() |
| None of the above | Plan problem | SQL tab: check for cross joins, missing predicate pushdown, wrong join strategy |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=67108864spark.sql.autoBroadcastJoinThreshold defaults to 10MB. In practice, tables up to 500MB to 1GB broadcast safely on modern clusters if executor memory is adequate. Check the physical plan in the SQL tab to confirm which join strategy Spark chose.spark.sql.adaptive.enabled=true before doing any manual tuning. AQE resolves the majority of shuffle partition count and broadcast join problems automatically. Use the investigation sequence above for what AQE does not fix: severe skew on low-cardinality keys, memory pressure from large individual task sizes, and first-stage underparallelism before AQE has seen statistics.A single faster run is not enough. Here is what to check after applying a fix:
Hope this is useful for diagnosing your slow stages. If you have additional techniques or thresholds that work well in your environment, please share them in the comments. These patterns improve with more data points.
3 hours ago
Thanks for this! Very insightful and detailed. Your sequence for diagnosing slow Spark jobs when symptoms overlap is exactly what I needed. Bookmarked this for my team.