cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Out of Memory after adding distinct operation

Klusener
New Contributor III

I have a spark pipeline which reads selected data from a table_1 as view and performs few aggregation via group by in next step and writes to target table. table_1 has large data ~30GB, compressed csv.

Step-1:

create or replace temporary view base_data as 
select /* distinct */ a.*, cast(timestamp_field as date) as date...
from table_1 a
where <basic_condition>

Step-2:

select <few columns> ,sum(..),max(..)
from base_data
group by <few columns>

Step-3: Write to target

This works fine. But the input table table_1 contains duplicates. When I enable the DISTINCT clause (currently commented out) in the view, the job fails due to an out-of-memory (OOM) error. Since GROUP BY also involves shuffling, why does adding DISTINCT cause the job to fail? I went through this StackOverflow discussion, but the question isn't about comparing the two when using the same set of columns.

1 ACCEPTED SOLUTION

Accepted Solutions

Avinash_Narala
Valued Contributor II

overhead memory (used for things like off-heap storage and shuffle operations) is separate from spark.executor.memory.

Let's break this down clearly:

1. Memory Breakdown in Spark Executors

Each executor's total memory consists of the following components:

a) Spark Executor Memory (spark.executor.memory):

b) Spark Memory Overhead (spark.executor.memoryOverhead):

Default Value=max(384MB, 0.1 * spark.executor.memory).

Configurable using spark.executor.memoryOverhead.

c) Total Executor Memory:

The total memory allocated per executor is the sum of the two:

 

Total Executor Memory = spark.executor.memory + spark.executor.memoryOverhead

2. Cluster Memory Configuration in Your Case

Given the setup you described:

  • Machine Memory: 16GB per worker node.
  • Spark Executor Memory: 7.6GB per executor (spark.executor.memory).
  • Available for Overhead: The remaining memory on the machine after accounting for spark.executor.memory.

Let’s calculate the breakdown:

  1. Executor JVM Memory: 7.6GB is reserved for spark.executor.memory.
  2. Overhead Memory:
    • By default, spark.executor.memoryOverhead is max(384MB, 0.1 * spark.executor.memory), i.e., max(384MB, 0.76GB) = 0.76GB in your case.
    • This leaves 16GB - (7.6GB + 0.76GB) = ~7.64GB for the OS, YARN, or other processes

Mark it as solution if this helps.

Regards,

Avinash N

View solution in original post

7 REPLIES 7

MadhuB
Contributor

Hi @Klusener 

Distinct is a very expense operation. For your case, I recommend to use either of the below deduplication strategies.

Most efficient method
df_deduped = df.dropDuplicates(subset=['unique_key_columns'])

For complex dedupe process - Partitioning and filter based on the rank.
WITH ranked_data AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY unique_key_columns ORDER BY timestamp DESC) as rnk
FROM table_1
)
SELECT * FROM ranked_data WHERE rnk = 1

Alternatively, there are ways to increase the executor memory or use a memory optimized cluster while configuring a job compute.

Let me know for anything, else please mark it as a solution. Cheers!

Klusener
New Contributor III

Thanks for the response. Could you please elaborate why is the distinct is an expensive operation? From my understanding, it's similar to a group by operation, where Spark likely uses hashing as a key to shuffle the data and eliminate duplicates. Why adding just distinct causing job failure/huge overrun? if you look at the query ie step-2, has group by on few columns (not all fields part of distinct ) to compute sum, max metrics which works fine.

Avinash_Narala
Valued Contributor II

How DISTINCT Works in Spark

When you apply DISTINCT, Spark performs a shuffle to eliminate duplicate rows. This involves:

  1. Sorting the data or grouping it to identify unique rows.
  2. Moving data across partitions (shuffle) based on its content.
  3. Writing intermediate results to memory/disk.

If your dataset is large (30GB compressed CSV), the amount of shuffle data can be enormous. Additionally, Spark needs to hold intermediate results in memory to deduplicate the rows, which could lead to OOM errors if the dataset is too large for the available memory.

How GROUP BY Works in Spark

GROUP BY also involves shuffling, but its behavior can differ:

  1. It shuffles data to group by the specified columns.
  2. Aggregations like SUM, MAX, etc., are applied during the shuffle process, reducing the amount of data that needs to be shuffled and stored in memory.

Since aggregations reduce the amount of data during the shuffle stage (e.g., combining rows into a single aggregate result for each group), GROUP BY typically uses less memory than DISTINCT.

Why DISTINCT Causes OOM

  1. Intermediate Data Volume: Unlike GROUP BY, which reduces data during aggregation, DISTINCT must hold all unique rows in memory until deduplication is complete.
  2. Skewed Data: If your data is skewed (some keys or rows appear more frequently), the DISTINCT operation can overload certain partitions, causing uneven memory usage and potential OOM errors.
  3. Large Shuffle Size: DISTINCT can create a larger shuffle size compared to GROUP BY, especially when the data contains many duplicates.

Hope this helps!

Mark it as solution if you found it helpful.

Regards,

Avinash N

Appreciate Avinash for the detailed response. I'm using n2 high cpu 16 has 16GB, with 10 workers total 160GB.  Memory is 7616M per executor i.e. executor memory is 77GB in total. Would it not be able to handle the (30GB compressed CSV) data for distinct?  Additionally, why having huge shuffle data could be an issue wrt OOM?

Avinash_Narala
Valued Contributor II

Your setup (160GB total memory across 10 workers with 16GB per node and 7.6GB per executor) is substantial, but running out of memory during a DISTINCT operation can still occur due to how Spark handles shuffle data and intermediate processing during deduplication.

While 30GB compressed data might seem manageable given your cluster's resources, several factors can impact memory usage during Spark operations:

1.Decompression Overhead:

  • Compressed data inflates significantly upon decompression. For example, a 30GB compressed CSV might expand to 100GB+ in memory, depending on the compression ratio and data structure

2.Shuffle Stage Memory Usage:

  • When Spark executes the DISTINCT operation, it shuffles and sorts data to identify unique rows. This involves holding a significant portion of the data in memory and spilling excess data to disk. Large shuffle files can overwhelm both memory and I/O bandwidth.

 

3.Executor Memory Overhead:

  • With 7.6GB memory per executor, a portion of that is reserved for the JVM overhead and shuffle memory buffers, leaving less memory available for processing. If a single executor gets overloaded( this may happen when you have skewed data), it may run out of memory despite the cluster having sufficient total memory.

much appreciate. Lastly before closing, is not overhead memory outside of spark.executor.memory (7.6GB) ? If spark.executor.memory is 7.6GB, is this dedicatedly for storage/execution with some reserved? Because 16 GB is total memory per machine, out of this 7.6GB is spark.executor.memory. So difference (16-7.6GB) would be for off heap.

Avinash_Narala
Valued Contributor II

overhead memory (used for things like off-heap storage and shuffle operations) is separate from spark.executor.memory.

Let's break this down clearly:

1. Memory Breakdown in Spark Executors

Each executor's total memory consists of the following components:

a) Spark Executor Memory (spark.executor.memory):

b) Spark Memory Overhead (spark.executor.memoryOverhead):

Default Value=max(384MB, 0.1 * spark.executor.memory).

Configurable using spark.executor.memoryOverhead.

c) Total Executor Memory:

The total memory allocated per executor is the sum of the two:

 

Total Executor Memory = spark.executor.memory + spark.executor.memoryOverhead

2. Cluster Memory Configuration in Your Case

Given the setup you described:

  • Machine Memory: 16GB per worker node.
  • Spark Executor Memory: 7.6GB per executor (spark.executor.memory).
  • Available for Overhead: The remaining memory on the machine after accounting for spark.executor.memory.

Let’s calculate the breakdown:

  1. Executor JVM Memory: 7.6GB is reserved for spark.executor.memory.
  2. Overhead Memory:
    • By default, spark.executor.memoryOverhead is max(384MB, 0.1 * spark.executor.memory), i.e., max(384MB, 0.76GB) = 0.76GB in your case.
    • This leaves 16GB - (7.6GB + 0.76GB) = ~7.64GB for the OS, YARN, or other processes

Mark it as solution if this helps.

Regards,

Avinash N

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group