cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Getting 'Multiple failure in stage materialization' error in one of my Job with notebook task

sandy_123
New Contributor II
Multiple failures in stage materialization. it tried using powerful Job cluster but it does not work out?? Any suggestions how should i fix it?
 
FYI- my dataframe(uniq_rec_df) has around 30M rows

 

-------Screenshot attached---

sandy_123_0-1768844333336.png

 

 


2 REPLIES 2

Saritha_S
Databricks Employee
Databricks Employee

Hi @sandy_123 

The "Multiple failures in stage materialization" error at line 120 is caused by a massive shuffle bottleneck

Check the spark UI and try to understand the reason for the failure such as RPC, heartbeat error etc

Primary Issues:

  1. Window Function Computed Multiple Times (Lines 26-117)
  • The window function computing geo_routing_flag using partition by concat(...) forces Spark to shuffle ALL data by visitor ID
  • This happens separately for each data source, causing multiple expensive shuffles
  • If you have skewed visitor IDs (some with thousands of records), certain partitions become extremely large
  1. Memory Pressure from Chain of Shuffles
  • By the time you reach the final write at line 120, accumulated shuffle operations have created memory pressure
  • Straggler tasks (some partitions processing 10-100x more data) cause the job to appear "stuck"
  1. Write Operation Triggering Final Shuffle
  • The .mode("overwrite") operation is the final straw that triggers one more massive shuffle
  • If using dynamic partitioning, this multiplies the cost further

To overcome the issue try the cache the frequently used df and see if you can tune the size of the executor by adding any config or see if increasing the executor helps in resolving the issue. 

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @sandy_123,

The "Multiple failures in stage materialization" error occurs when Spark executors fail to fetch shuffle data from other executors during a shuffle stage. With 30 million rows, your job is likely hitting resource limits during a shuffle-heavy operation (such as a join, groupBy, window function, or repartition). Here is a systematic approach to resolve this.

UNDERSTAND THE ROOT CAUSE

This error is a wrapper around multiple FetchFailedException errors within a single Spark stage. Common causes include:

1. Executors running out of memory during shuffle writes or reads
2. Executors running out of local disk space for shuffle spill files
3. Executors being lost or preempted (especially with spot/preemptible instances)
4. Network timeouts between executors during shuffle data transfer
5. Data skew causing one or more partitions to be far larger than others

INCREASE SHUFFLE PARTITIONS

The default value of spark.sql.shuffle.partitions is 200. With 30 million rows, depending on the width of your DataFrame, this may produce partitions that are too large. Increasing the partition count distributes the work more evenly:

spark.conf.set("spark.sql.shuffle.partitions", "1000")

You can also try Adaptive Query Execution (AQE), which is enabled by default on Databricks Runtime 12.2 LTS and above. AQE automatically coalesces and splits shuffle partitions at runtime. Verify it is enabled:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

USE LARGER OR STORAGE-OPTIMIZED INSTANCES

Rather than adding more small worker nodes, switch to fewer but larger instances. This reduces cross-node shuffle traffic. Storage-optimized instance types (e.g., i3 series on AWS, L-series on Azure, n2-highmem on GCP) include local NVMe SSDs that provide fast scratch space for shuffle spill.

You can also add EBS shuffle volumes in the cluster configuration under Advanced Options to give executors more local disk for shuffle data.

CHECK FOR DATA SKEW

Data skew is one of the most frequent causes of stage materialization failures. If one partition has significantly more data than others, that executor can run out of memory or time out.

To check for skew, look at the Spark UI for the failing stage. Under the "Tasks" tab, compare the shuffle read/write sizes across tasks. If you see a few tasks with 10x or more data than the median, you have a skew problem.

Mitigations for skew:
- Enable AQE skew join handling (shown above)
- Add a salt column to your join key to distribute skewed keys across multiple partitions
- Filter out or pre-aggregate the skewed keys before the join

ADD EXPLICIT REPARTITIONING BEFORE HEAVY OPERATIONS

If you know which operation is causing the failure, repartition the DataFrame before it:

uniq_rec_df = uniq_rec_df.repartition(500, "your_key_column")

This ensures a more even distribution of data across partitions before the shuffle-heavy step.

INCREASE EXECUTOR MEMORY AND OVERHEAD

If your executors are running out of memory during shuffle, increase the memory fraction allocated to shuffle:

spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.executor.memoryOverhead", "4g")

On job clusters, you configure these through the cluster Spark Config in the job definition.

ENABLE DISK SPILL AND RETRY SETTINGS

These configurations help Spark be more resilient to transient shuffle failures:

spark.conf.set("spark.shuffle.io.maxRetries", "10")
spark.conf.set("spark.shuffle.io.retryWait", "30s")
spark.conf.set("spark.shuffle.io.connectionTimeout", "240s")

RECOMMENDED DEBUGGING STEPS

1. Open the Spark UI for the failed job run (Job Runs page, click the run, then click the Spark UI link)
2. Go to the Stages tab and find the failed stage
3. Check the "Failure Reason" column for the specific underlying exception (FetchFailedException, OutOfMemoryError, ExecutorLostFailure, etc.)
4. Check the "Shuffle Read" and "Shuffle Write" sizes to understand the data volume
5. Look at the "Event Timeline" to see if executors were removed or lost during the stage

The specific underlying exception will guide which of the above solutions to prioritize. For example, if you see OutOfMemoryError, focus on memory settings and partitioning. If you see connection timeouts, focus on retry settings and using larger instances with local SSDs.

For more details on cluster sizing for shuffle-heavy workloads, see:
https://docs.databricks.com/en/compute/cluster-config-best-practices.html

For information on Adaptive Query Execution:
https://docs.databricks.com/en/optimizations/aqe.html

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.