2 weeks ago
I have a spark pipeline which reads selected data from a table_1 as view and performs few aggregation via group by in next step and writes to target table. table_1 has large data ~30GB, compressed csv.
create or replace temporary view base_data as select /* distinct */ a.*, cast(timestamp_field as date) as date... from table_1 a where <basic_condition>
select <few columns> ,sum(..),max(..) from base_data group by <few columns>
This works fine. But the input table table_1 contains duplicates. When I enable the DISTINCT clause (currently commented out) in the view, the job fails due to an out-of-memory (OOM) error. Since GROUP BY also involves shuffling, why does adding DISTINCT cause the job to fail? I went through this StackOverflow discussion, but the question isn't about comparing the two when using the same set of columns.
2 weeks ago
overhead memory (used for things like off-heap storage and shuffle operations) is separate from spark.executor.memory.
Let's break this down clearly:
Each executor's total memory consists of the following components:
The total memory allocated per executor is the sum of the two:
Given the setup you described:
Let’s calculate the breakdown:
Mark it as solution if this helps.
Regards,
Avinash N
2 weeks ago
Hi @Klusener
Distinct is a very expense operation. For your case, I recommend to use either of the below deduplication strategies.
Most efficient method
df_deduped = df.dropDuplicates(subset=['unique_key_columns'])
For complex dedupe process - Partitioning and filter based on the rank.
WITH ranked_data AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY unique_key_columns ORDER BY timestamp DESC) as rnk
FROM table_1
)
SELECT * FROM ranked_data WHERE rnk = 1
Alternatively, there are ways to increase the executor memory or use a memory optimized cluster while configuring a job compute.
Let me know for anything, else please mark it as a solution. Cheers!
2 weeks ago
Thanks for the response. Could you please elaborate why is the distinct is an expensive operation? From my understanding, it's similar to a group by operation, where Spark likely uses hashing as a key to shuffle the data and eliminate duplicates. Why adding just distinct causing job failure/huge overrun? if you look at the query ie step-2, has group by on few columns (not all fields part of distinct ) to compute sum, max metrics which works fine.
2 weeks ago
When you apply DISTINCT, Spark performs a shuffle to eliminate duplicate rows. This involves:
If your dataset is large (30GB compressed CSV), the amount of shuffle data can be enormous. Additionally, Spark needs to hold intermediate results in memory to deduplicate the rows, which could lead to OOM errors if the dataset is too large for the available memory.
GROUP BY also involves shuffling, but its behavior can differ:
Since aggregations reduce the amount of data during the shuffle stage (e.g., combining rows into a single aggregate result for each group), GROUP BY typically uses less memory than DISTINCT.
Hope this helps!
Mark it as solution if you found it helpful.
Regards,
Avinash N
2 weeks ago
Appreciate Avinash for the detailed response. I'm using n2 high cpu 16 has 16GB, with 10 workers total 160GB. Memory is 7616M per executor i.e. executor memory is 77GB in total. Would it not be able to handle the (30GB compressed CSV) data for distinct? Additionally, why having huge shuffle data could be an issue wrt OOM?
2 weeks ago
Your setup (160GB total memory across 10 workers with 16GB per node and 7.6GB per executor) is substantial, but running out of memory during a DISTINCT operation can still occur due to how Spark handles shuffle data and intermediate processing during deduplication.
While 30GB compressed data might seem manageable given your cluster's resources, several factors can impact memory usage during Spark operations:
1.Decompression Overhead:
2.Shuffle Stage Memory Usage:
3.Executor Memory Overhead:
2 weeks ago - last edited 2 weeks ago
much appreciate. Lastly before closing, is not overhead memory outside of spark.executor.memory (7.6GB) ? If spark.executor.memory is 7.6GB, is this dedicatedly for storage/execution with some reserved? Because 16 GB is total memory per machine, out of this 7.6GB is spark.executor.memory. So difference (16-7.6GB) would be for off heap.
2 weeks ago
overhead memory (used for things like off-heap storage and shuffle operations) is separate from spark.executor.memory.
Let's break this down clearly:
Each executor's total memory consists of the following components:
The total memory allocated per executor is the sum of the two:
Given the setup you described:
Let’s calculate the breakdown:
Mark it as solution if this helps.
Regards,
Avinash N
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group