cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Smaller dataset causing OOM on large cluster

Klusener
Contributor

I have a pyspark job reading the input data volume of just ~50-55GB Parquet data from a delta table on Databricks. Job is using n2-highmem-4 GCP VM and 1-15 worker with autoscaling on databricks. Each workerVM of type n2-highmem-4 has 32GB memory and 4 cores. Each VM has one executor. 22GB is allocated per executor. ie 22*15=330GB overall executor memory, which seems to be large enough for ~55GB input data. shuffle partition is set to 200. But Im getting OOM error.

  • Input data volume : 55GB
  • Number of worker : 15 n2-highmem-4 GCP VM and 1-15 worker with autoscaling
  • Number of executor per worker : 1
  • number of core per executor (or worker) : 4 ie. only 4 tasks can run in parallel
  • shuffle partitions : 200
  • so number of partitions per worker : 200/15 = ~13 partitions
  • data per partition : 55GB/200 = ~275MB (this is just for calculation, there would be skew, some partitions will have much more data, is there a way to figure out from spark UI?)
  • Overall executor memory : 22*15=330GB 
    • Spark memory (storage+execution) per worker = 0.6*(22000MB-300MB) = ~13GB

Could you please help understand why this is not sufficient leading to oom? Also is it necessary for all ~13 partitions assigned to an executor to fit in memory at once or since only 4 tasks run in parallel per executor, is it sufficient for memory to accommodate just 4 partitions at a time?

11 REPLIES 11

BigRoux
Databricks Employee
Databricks Employee
The OutOfMemory (OOM) issue you're experiencing in your PySpark job could stem from several factors. Here's a breakdown of potential causes and mitigation strategies:
  1. Skew in Data Partitions:
    • Based on your calculation, the data size per partition is approximately 275 MB. However, due to possible data skew, some partitions could be significantly larger and overwhelm the executor memory. To investigate skew, you can check the Spark UI:
      • Navigate to the "Stages" tab of the Spark UI.
      • For failed stages, examine partition sizes in the stage detail summary.
      • If some partitions are exceptionally large compared to others, this indicates skew.
    • To address skew:
      • Increase the number of shuffle partitions beyond 200 to distribute data more evenly.
      • Use Adaptive Query Execution (AQE), which dynamically coalesces skewed partitions at runtime. Enable this with: python spark.conf.set("spark.sql.adaptive.enabled", "true")
      • Consider using Spark’s skew hints to handle skewed joins or aggregations.
  2. Execution Memory:
    • Executors have a memory allocation, calculated as 0.6 * (availableMemory - reservedMemory), where approximately 13 GB per executor is available for execution and storage tasks. If tasks for large partitions require more memory, spilling to disk occurs, which can lead to OOM errors.
    • Because only four tasks run in parallel on each executor (four cores per executor), memory may only need to accommodate these four concurrent tasks. However, if any single task exceeds its share of memory, you'll encounter OOM. Ensure partitions are small enough for this allocation.
  3. Storage vs. Execution Memory:
    • Managing memory pressure due to intermediate data (like shuffle, join, or aggregation) spilling to disk can help reduce OOM issues. You can adjust memory configurations to allocate more towards execution workload: python spark.conf.set("spark.memory.fraction", "0.8") # Adjusts execution memory fraction
    • Alternatively, forcing intermediate spills to disk earlier (instead of keeping them in-memory) could mitigate constraints.
  4. Cluster Configuration:
    • Evaluate the vertical and horizontal scaling of your cluster. If OOM persists despite partition adjustments, consider increasing the memory for each executor or the number of workers to spread the load more evenly.
    • For instance:
      • If upgrading workers, opt for instance types optimized for memory.
      • If increasing the number of workers, repartition the data to maximize parallelism.
  5. Additional Debugging Tips:
    • Enable more detailed logging and diagnostic tools to pinpoint challenges in specific stages or tasks.
    • Use the Spark SQL and Catalyst optimizations (explain() function) to understand how transformations and actions are executed. An optimal logical and physical plan helps avoid performance bottlenecks.
These steps should help you identify and mitigate the OOM issue affecting your job. As always, iterative tuning and profiling based on specific details of your workload is key to achieving optimal performance.
 
Hope this helps, Big Roux.

Klusener
Contributor

Thank you so much for the detailed response, much appreciate. Two followup question 

  1. how do we check the partition size for failed tasks from UI (or skew)? for ex, if I goto Spark UI for the failed stage, it gives summary as below. It shows 4 tasks as failed, but does not indicate partition size that caused oom.
  2. 'Summary Metrics' indicates Max shuffle Read Size as 838.1MB. Just curious is not it smaller size to cause OOM?

Klusener_0-1746599429539.png

 

 

 

 

BigRoux
Databricks Employee
Databricks Employee

We will get back to you shortly.

mark_ott
Databricks Employee
Databricks Employee

You need to enable more Metrics.  Click on the below hotlink and turn on all Metrics. 

mark_ott_0-1746623229451.png

 

mark_ott
Databricks Employee
Databricks Employee

Opps. That's the wrong pic.  Here's the correct one.

metrics.png

mark_ott
Databricks Employee
Databricks Employee

I'm guessing you are running one of more Wide transformations in your query and that is causing Skewed Shuffle Partitions.  Go back to Stages tab and check out 'Shuffle Write Size/Records' row.

m2.png

mark_ott
Databricks Employee
Databricks Employee

I'm guessing you have Shuffle Write Sizes that are > 1GB.  That's when things start going down the rathole with things like Spill and OOM.  Here's a few questions I have for your.  Is Adaptive Query Execution enabled? Also I say in your earlier screen shot you had some nasty Java Garbage Collection.  Is your Cluster Photon-enabled?  This can reduce the JGC. 

mark_ott
Databricks Employee
Databricks Employee

Other things to consider. By any chance do you have Spot instances of Workers turned on (edge case)? I've seen where this hand-cuffs AQE. If have join, do you have the smaller table as the first table in the JOIN?  Are you ANALYZE TABLE which can change the Join strategy to one that won't go OOM? These are some things to consider.

 

Klusener
Contributor

Much appreciate @mark_ott  and @BigRoux for the prompt response.

The job uses below cluster/settings. 

  • Cluster/spark version - Driver: n2-highmem-4 · Workers: n2-highmem-4 · 5-15 workers · DBR: 15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) on GCP
  • Photon is not enabled
  • Spot/Preemptible instance is enabled
  • rest default databricks settings, not set any configs explicitly

I just enabled 'Show Additional Metrics'  on stage and attaching both job/stage/task details from Spark UI. Only single job and stage has failed. There is no shuffle write. Is not AQE enabled be default on Spark 3 onwards?

Klusener_0-1746629479189.png

Klusener_1-1746629503205.png

 

Klusener_2-1746629611871.png

 

 

mark_ott
Databricks Employee
Databricks Employee

OK, without having your code or DAG, it's a little difficult to figure this out.  But here's something that should work.  First, figure out who many Memory Partitions you have.  Apparently, your Memory Partitions are too big for the cluster, hence the OOM. Use this generic code as a template. 

num_partitions = df.rdd.getNumPartitions() print(num_partitions)

mark_ott
Databricks Employee
Databricks Employee

Next, use the repartition(n) to increase your dataframe to twice the number you got earlier. For example, if num_partitions was 30, then repartition(60) prior to running your query.  With half the data in each Memory Partition, I'm guessing you won't OOM.  If you still do, increase the number by x2 again until the OOM disappears..

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now