cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks PySpark error: OutOfMemoryError: GC overhead limit exceeded

Sunflower7500
New Contributor II

I have a Databricks pyspark query that has been running fine for the last two weeks but am now getting the following error despite no changes to the query: OutOfMemoryError: GC overhead limit exceeded.

I have done some research on possible solutions and have tried adding in partitions to see if that would help but it did not.

My current cluster configurations are as follows:

Sunflower7500_0-1738624317697.png

 

Are there any other ways I can troubleshoot this error?

Here is the query I am running:

 

 

from pyspark.sql.window import Window
from pyspark.sql.functions import col, lit, when, sum, row_number

# Add a unique row ID column using row_number
window_spec = Window.orderBy(lit(1))
df_bs_exploded1 = df_bs_exploded.withColumn("unique_row_id", row_number().over(window_spec))

# Step 1: Aggregate df_final to get the total allocation quantity per key
df_allocations = df_final.groupBy(
    "sap_sales_order_number", "product_end_customer_id", "marketing_sub_code", "fiscal_quarter"
).agg(
    sum("allocation_quantity").alias("total_allocation_quantity")
)

# Step 2: Join df_bs_exploded with df_allocations
df_joined = df_bs_exploded1.join(
    df_allocations,
    on=["sap_sales_order_number", "product_end_customer_id", "marketing_sub_code", "fiscal_quarter"],
    how="left"
)

# Step 3: Add a row number for each partition to track allocation order
window_spec_bs1 = Window.partitionBy(
    "sap_sales_order_number", "product_end_customer_id", "marketing_sub_code","fiscal_quarter"
).orderBy(lit(1))

df_with_row_number = df_joined.withColumn("row_number", row_number().over(window_spec_bs1))

# Step 4: Allocate rows based on total_allocation_quantity
df_allocated = df_with_row_number.withColumn(
    "allocated",
    when(
        (col("row_number") <= col("total_allocation_quantity")) & col("total_allocation_quantity").isNotNull(),
        "Yes"
    ).otherwise("No")
)

# Step 5: Add an allocated_quantity column to track allocated amounts
df_final_bs_result = df_allocated.withColumn(
    "allocated_quantity",
    when(col("allocated") == "Yes", 1).otherwise(lit(None))
)

# Drop the temporary row_number column if no longer needed
df_final_bs_result = df_final_bs_result.drop("row_number")

# Display the final result
display(df_final_bs_result)

 

 

And here is the error message I am getting:

 

 

 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 761.0 failed 4 times, most recent failure: Lost task 1.3 in stage 761.0 (TID 8251) (100.65.19.63 executor 11): java.lang.OutOfMemoryError: GC overhead limit exceeded
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:513)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.generate_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.hashAgg_doAggregateWithKeysOutput_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:179)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
    at org.apache.spark.scheduler.ShuffleMapTask$$Lambda$2492/1771619670.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
    at org.apache.spark.scheduler.ShuffleMapTask$$Lambda$1927/946116243.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:225)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
    at org.apache.spark.scheduler.Task$$Lambda$1907/230573460.apply(Unknown Source)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
    at com.databricks.unity.HandleImpl$$Lambda$1908/336068323.apply(Unknown Source)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
    at org.apache.spark.scheduler.Task$$Lambda$1871/401348611.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:102)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4018)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:4016)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3930)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3917)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3917)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1766)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1749)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1749)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4277)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4179)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4165)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:55)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:513)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.generate_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.hashAgg_doAggregateWithKeysOutput_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:179)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:56)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92)
    at org.apache.spark.scheduler.ShuffleMapTask$$Lambda$2492/1771619670.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87)
    at org.apache.spark.scheduler.ShuffleMapTask$$Lambda$1927/946116243.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:225)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
    at org.apache.spark.scheduler.Task$$Lambda$1907/230573460.apply(Unknown Source)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
    at com.databricks.unity.HandleImpl$$Lambda$1908/336068323.apply(Unknown Source)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
    at org.apache.spark.scheduler.Task$$Lambda$1871/401348611.apply(Unknown Source)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:102)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)

 

 

 

4 REPLIES 4

saurabh18cs
Valued Contributor III

The OutOfMemoryError: GC overhead limit exceeded error in Spark typically indicates that the JVM garbage collector is spending too much time trying to free up memory and is unable to do so effectively.

 

do you have large data volumes?

  1. Increase Executor Memory or switch to memory optimized compute:

    • Increase the memory allocated to each executor to handle larger data volumes
  2. set spark.sql.shuffle.partitions to auto
  3. cache your dataframes to avoid recomputation
 
df_bs_exploded1.cache()
 
df_allocations.cache()
 
df_joined.cache()

Thank you for the suggestions.

Yes, I have about 60 million rows of data. I found that when I comment out the last line "display(df_final_bs_result)", the code runs great. So it seems like the error could have been triggered by this operation since it attempts to collect and materialize the entire dataframe. I'll try to implement caching to see if that is a workaround as well

loic
New Contributor III

When you say:
 "I have a Databricks pyspark query that has been running fine for the last two weeks but am now getting the following error despite no changes to the query: OutOfMemoryError: GC overhead limit exceeded."
Can you tell us how do you execute your query?
Is it a notebook that is always executed on the same 'all purpose compute"?
If yes, I think you are facing the same issue than me. When I run my notebook once and then let time to the compute to shut down, it is ok. But, if I run my notebook several times without letting time for the cluster to shutdown, then, after several occurences it crash. (I don't use cache at all, so no memory leak here)

Sunflower7500
New Contributor II

Thanks for the response. It is being run in a notebook that is connected to all-purpose compute that is used by other notebooks/connections (like Power BI)

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now