Hi @sandy_123,
The "Multiple failures in stage materialization" error occurs when Spark executors fail to fetch shuffle data from other executors during a shuffle stage. With 30 million rows, your job is likely hitting resource limits during a shuffle-heavy operation (such as a join, groupBy, window function, or repartition). Here is a systematic approach to resolve this.
UNDERSTAND THE ROOT CAUSE
This error is a wrapper around multiple FetchFailedException errors within a single Spark stage. Common causes include:
1. Executors running out of memory during shuffle writes or reads
2. Executors running out of local disk space for shuffle spill files
3. Executors being lost or preempted (especially with spot/preemptible instances)
4. Network timeouts between executors during shuffle data transfer
5. Data skew causing one or more partitions to be far larger than others
INCREASE SHUFFLE PARTITIONS
The default value of spark.sql.shuffle.partitions is 200. With 30 million rows, depending on the width of your DataFrame, this may produce partitions that are too large. Increasing the partition count distributes the work more evenly:
spark.conf.set("spark.sql.shuffle.partitions", "1000")
You can also try Adaptive Query Execution (AQE), which is enabled by default on Databricks Runtime 12.2 LTS and above. AQE automatically coalesces and splits shuffle partitions at runtime. Verify it is enabled:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
USE LARGER OR STORAGE-OPTIMIZED INSTANCES
Rather than adding more small worker nodes, switch to fewer but larger instances. This reduces cross-node shuffle traffic. Storage-optimized instance types (e.g., i3 series on AWS, L-series on Azure, n2-highmem on GCP) include local NVMe SSDs that provide fast scratch space for shuffle spill.
You can also add EBS shuffle volumes in the cluster configuration under Advanced Options to give executors more local disk for shuffle data.
CHECK FOR DATA SKEW
Data skew is one of the most frequent causes of stage materialization failures. If one partition has significantly more data than others, that executor can run out of memory or time out.
To check for skew, look at the Spark UI for the failing stage. Under the "Tasks" tab, compare the shuffle read/write sizes across tasks. If you see a few tasks with 10x or more data than the median, you have a skew problem.
Mitigations for skew:
- Enable AQE skew join handling (shown above)
- Add a salt column to your join key to distribute skewed keys across multiple partitions
- Filter out or pre-aggregate the skewed keys before the join
ADD EXPLICIT REPARTITIONING BEFORE HEAVY OPERATIONS
If you know which operation is causing the failure, repartition the DataFrame before it:
uniq_rec_df = uniq_rec_df.repartition(500, "your_key_column")
This ensures a more even distribution of data across partitions before the shuffle-heavy step.
INCREASE EXECUTOR MEMORY AND OVERHEAD
If your executors are running out of memory during shuffle, increase the memory fraction allocated to shuffle:
spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.executor.memoryOverhead", "4g")
On job clusters, you configure these through the cluster Spark Config in the job definition.
ENABLE DISK SPILL AND RETRY SETTINGS
These configurations help Spark be more resilient to transient shuffle failures:
spark.conf.set("spark.shuffle.io.maxRetries", "10")
spark.conf.set("spark.shuffle.io.retryWait", "30s")
spark.conf.set("spark.shuffle.io.connectionTimeout", "240s")
RECOMMENDED DEBUGGING STEPS
1. Open the Spark UI for the failed job run (Job Runs page, click the run, then click the Spark UI link)
2. Go to the Stages tab and find the failed stage
3. Check the "Failure Reason" column for the specific underlying exception (FetchFailedException, OutOfMemoryError, ExecutorLostFailure, etc.)
4. Check the "Shuffle Read" and "Shuffle Write" sizes to understand the data volume
5. Look at the "Event Timeline" to see if executors were removed or lost during the stage
The specific underlying exception will guide which of the above solutions to prioritize. For example, if you see OutOfMemoryError, focus on memory settings and partitioning. If you see connection timeouts, focus on retry settings and using larger instances with local SSDs.
For more details on cluster sizing for shuffle-heavy workloads, see:
https://docs.databricks.com/en/compute/cluster-config-best-practices.html
For information on Adaptive Query Execution:
https://docs.databricks.com/en/optimizations/aqe.html
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.
If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.