<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Out of Memory after adding distinct operation in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106979#M42668</link>
    <description>&lt;P&gt;Your setup (160GB total memory across 10 workers with 16GB per node and 7.6GB per executor) is substantial, but running out of memory during a DISTINCT operation can still occur due to &lt;STRONG&gt;how Spark handles shuffle data&lt;/STRONG&gt; and &lt;STRONG&gt;intermediate processing during deduplication&lt;/STRONG&gt;.&lt;/P&gt;&lt;P&gt;While 30GB compressed data might seem manageable given your cluster's resources, several factors can impact memory usage during Spark operations:&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;1.Decompression Overhead&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Compressed data inflates significantly upon decompression. For example, a 30GB compressed CSV might expand to 100GB+ in memory, depending on the compression ratio and data structure&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;STRONG&gt;2.Shuffle Stage Memory Usage&lt;/STRONG&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;When Spark executes the &lt;STRONG&gt;DISTINCT &lt;/STRONG&gt;operation, it shuffles and sorts data to identify unique rows. This involves holding a significant portion of the data in memory and spilling excess data to disk. Large shuffle files can overwhelm both memory and I/O bandwidth.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;3.Executor Memory Overhead&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;With 7.6GB memory per executor, a portion of that is reserved for the &lt;STRONG&gt;JVM overhead&lt;/STRONG&gt; and &lt;STRONG&gt;shuffle memory buffers&lt;/STRONG&gt;, leaving less memory available for processing. If a single executor gets overloaded( this may happen when you have skewed data), it may run out of memory despite the cluster having sufficient total memory.&lt;/LI&gt;&lt;/UL&gt;</description>
    <pubDate>Sat, 25 Jan 2025 06:06:10 GMT</pubDate>
    <dc:creator>Avinash_Narala</dc:creator>
    <dc:date>2025-01-25T06:06:10Z</dc:date>
    <item>
      <title>Out of Memory after adding distinct operation</title>
      <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106934#M42648</link>
      <description>&lt;P&gt;I have a spark pipeline which reads selected data from a table_1 as view and performs few aggregation via group by in next step and writes to target table. table_1 has large data ~30GB, compressed csv.&lt;/P&gt;&lt;H1&gt;Step-1:&lt;/H1&gt;&lt;PRE&gt;&lt;SPAN class=""&gt;create&lt;/SPAN&gt; &lt;SPAN class=""&gt;or&lt;/SPAN&gt; replace temporary &lt;SPAN class=""&gt;view&lt;/SPAN&gt; base_data &lt;SPAN class=""&gt;as&lt;/SPAN&gt; 
&lt;SPAN class=""&gt;select&lt;/SPAN&gt; &lt;SPAN class=""&gt;/* distinct */&lt;/SPAN&gt; a.&lt;SPAN class=""&gt;*&lt;/SPAN&gt;, &lt;SPAN class=""&gt;cast&lt;/SPAN&gt;(timestamp_field &lt;SPAN class=""&gt;as&lt;/SPAN&gt; &lt;SPAN class=""&gt;date&lt;/SPAN&gt;) &lt;SPAN class=""&gt;as&lt;/SPAN&gt; date...
&lt;SPAN class=""&gt;from&lt;/SPAN&gt; table_1 a
&lt;SPAN class=""&gt;where&lt;/SPAN&gt; &lt;SPAN class=""&gt;&amp;lt;&lt;/SPAN&gt;basic_condition&lt;SPAN class=""&gt;&amp;gt;&lt;/SPAN&gt;
&lt;/PRE&gt;&lt;H1&gt;Step-2:&lt;/H1&gt;&lt;PRE&gt;&lt;SPAN class=""&gt;select&lt;/SPAN&gt; &lt;SPAN class=""&gt;&amp;lt;&lt;/SPAN&gt;few columns&lt;SPAN class=""&gt;&amp;gt;&lt;/SPAN&gt; ,&lt;SPAN class=""&gt;sum&lt;/SPAN&gt;(..),&lt;SPAN class=""&gt;max&lt;/SPAN&gt;(..)
&lt;SPAN class=""&gt;from&lt;/SPAN&gt; base_data
&lt;SPAN class=""&gt;group&lt;/SPAN&gt; &lt;SPAN class=""&gt;by&lt;/SPAN&gt; &lt;SPAN class=""&gt;&amp;lt;&lt;/SPAN&gt;few columns&lt;SPAN class=""&gt;&amp;gt;&lt;/SPAN&gt;
&lt;/PRE&gt;&lt;H1&gt;Step-3: Write to target&lt;/H1&gt;&lt;P&gt;This works fine. But the input table table_1 contains duplicates. When I enable the DISTINCT clause (currently commented out) in the view, the job fails due to an out-of-memory (OOM) error. Since GROUP BY also involves shuffling, why does adding DISTINCT cause the job to fail? I went through this StackOverflow&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A href="https://stackoverflow.com/questions/52274221/why-is-groupby-a-lot-faster-than-distinct-in-pyspark" target="_blank" rel="noopener"&gt;discussion&lt;/A&gt;, but the question isn't about comparing the two when using the same set of columns.&lt;/P&gt;</description>
      <pubDate>Fri, 24 Jan 2025 17:37:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106934#M42648</guid>
      <dc:creator>Klusener</dc:creator>
      <dc:date>2025-01-24T17:37:11Z</dc:date>
    </item>
    <item>
      <title>Re: Out of Memory after adding distinct operation</title>
      <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106945#M42654</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/69270"&gt;@Klusener&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Distinct is a very expense operation. For your case, I recommend to use either of the below&amp;nbsp;deduplication strategies.&lt;/P&gt;&lt;P&gt;Most efficient method&lt;BR /&gt;&lt;FONT size="2" color="#339966"&gt;df_deduped = df.dropDuplicates(subset=['unique_key_columns'])&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;For complex dedupe process - Partitioning and filter based on the rank.&lt;BR /&gt;&lt;FONT size="2" color="#339966"&gt;WITH ranked_data AS (&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT size="2" color="#339966"&gt;SELECT *,&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT size="2" color="#339966"&gt;ROW_NUMBER() OVER (PARTITION BY unique_key_columns ORDER BY timestamp DESC) as rnk&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT size="2" color="#339966"&gt;FROM table_1&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT size="2" color="#339966"&gt;)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT size="2" color="#339966"&gt;SELECT * FROM ranked_data WHERE rnk = 1&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;Alternatively, there are ways to increase the executor memory or use a memory optimized cluster while configuring a job compute.&lt;/P&gt;&lt;P&gt;Let me know for anything, else &lt;STRONG&gt;please mark it as a solution. &lt;/STRONG&gt;&lt;FONT size="4"&gt;Cheers!&lt;/FONT&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 24 Jan 2025 18:32:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106945#M42654</guid>
      <dc:creator>MadhuB</dc:creator>
      <dc:date>2025-01-24T18:32:09Z</dc:date>
    </item>
    <item>
      <title>Re: Out of Memory after adding distinct operation</title>
      <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106973#M42662</link>
      <description>&lt;P&gt;Thanks for the response. Could you please elaborate why is the distinct is an expensive operation? From my understanding, it's similar to a group by operation, where Spark likely uses hashing as a key to shuffle the data and eliminate duplicates. Why adding just distinct causing job failure/huge overrun? if you look at the query ie step-2, has group by on few columns (not all fields part of distinct ) to compute sum, max metrics which works fine.&lt;/P&gt;</description>
      <pubDate>Sat, 25 Jan 2025 04:10:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106973#M42662</guid>
      <dc:creator>Klusener</dc:creator>
      <dc:date>2025-01-25T04:10:16Z</dc:date>
    </item>
    <item>
      <title>Re: Out of Memory after adding distinct operation</title>
      <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106974#M42663</link>
      <description>&lt;H3&gt;How DISTINCT Works in Spark&lt;/H3&gt;&lt;P&gt;When you apply DISTINCT, Spark performs a shuffle to eliminate duplicate rows. This involves:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;STRONG&gt;Sorting&lt;/STRONG&gt; the data or grouping it to identify unique rows.&lt;/LI&gt;&lt;LI&gt;Moving data across partitions (shuffle) based on its content.&lt;/LI&gt;&lt;LI&gt;Writing intermediate results to memory/disk.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;If your dataset is large (30GB compressed CSV), the amount of shuffle data can be enormous. Additionally, Spark needs to hold intermediate results in memory to deduplicate the rows, which could lead to OOM errors if the dataset is too large for the available memory.&lt;/P&gt;&lt;H3&gt;How GROUP BY Works in Spark&lt;/H3&gt;&lt;P&gt;GROUP BY also involves shuffling, but its behavior can differ:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;It shuffles data to group by the specified columns.&lt;/LI&gt;&lt;LI&gt;Aggregations like SUM, MAX, etc., are applied during the shuffle process, reducing the amount of data that needs to be shuffled and stored in memory.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Since aggregations reduce the amount of data during the shuffle stage (e.g., combining rows into a single aggregate result for each group), GROUP BY typically uses less memory than DISTINCT.&lt;/P&gt;&lt;H3&gt;Why DISTINCT Causes OOM&lt;/H3&gt;&lt;OL&gt;&lt;LI&gt;&lt;STRONG&gt;Intermediate Data Volume&lt;/STRONG&gt;: Unlike GROUP BY, which reduces data during aggregation, DISTINCT must hold all unique rows in memory until deduplication is complete.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Skewed Data&lt;/STRONG&gt;: If your data is skewed (some keys or rows appear more frequently), the DISTINCT operation can overload certain partitions, causing uneven memory usage and potential OOM errors.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Large Shuffle Size&lt;/STRONG&gt;: DISTINCT can create a larger shuffle size compared to GROUP BY, especially when the data contains many duplicates.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Hope this helps!&lt;/P&gt;&lt;P&gt;Mark it as solution if you found it helpful.&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;Avinash N&lt;/P&gt;</description>
      <pubDate>Sat, 25 Jan 2025 05:36:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106974#M42663</guid>
      <dc:creator>Avinash_Narala</dc:creator>
      <dc:date>2025-01-25T05:36:25Z</dc:date>
    </item>
    <item>
      <title>Re: Out of Memory after adding distinct operation</title>
      <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106977#M42666</link>
      <description>&lt;P&gt;Appreciate Avinash for the detailed response. I'm using&amp;nbsp;&lt;SPAN&gt;n2 high cpu 16 has 16GB, with 10 workers total 160GB.&amp;nbsp; Memory is&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;7616M&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;per executor i.e. executor memory is 77GB in total. Would it not be able to handle the (30GB compressed CSV) data for&amp;nbsp;distinct?&lt;/SPAN&gt;&amp;nbsp; Additionally, why having huge shuffle data could be an issue wrt OOM?&lt;/P&gt;</description>
      <pubDate>Sat, 25 Jan 2025 05:52:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106977#M42666</guid>
      <dc:creator>Klusener</dc:creator>
      <dc:date>2025-01-25T05:52:02Z</dc:date>
    </item>
    <item>
      <title>Re: Out of Memory after adding distinct operation</title>
      <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106979#M42668</link>
      <description>&lt;P&gt;Your setup (160GB total memory across 10 workers with 16GB per node and 7.6GB per executor) is substantial, but running out of memory during a DISTINCT operation can still occur due to &lt;STRONG&gt;how Spark handles shuffle data&lt;/STRONG&gt; and &lt;STRONG&gt;intermediate processing during deduplication&lt;/STRONG&gt;.&lt;/P&gt;&lt;P&gt;While 30GB compressed data might seem manageable given your cluster's resources, several factors can impact memory usage during Spark operations:&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;1.Decompression Overhead&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Compressed data inflates significantly upon decompression. For example, a 30GB compressed CSV might expand to 100GB+ in memory, depending on the compression ratio and data structure&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;STRONG&gt;2.Shuffle Stage Memory Usage&lt;/STRONG&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;When Spark executes the &lt;STRONG&gt;DISTINCT &lt;/STRONG&gt;operation, it shuffles and sorts data to identify unique rows. This involves holding a significant portion of the data in memory and spilling excess data to disk. Large shuffle files can overwhelm both memory and I/O bandwidth.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;3.Executor Memory Overhead&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;With 7.6GB memory per executor, a portion of that is reserved for the &lt;STRONG&gt;JVM overhead&lt;/STRONG&gt; and &lt;STRONG&gt;shuffle memory buffers&lt;/STRONG&gt;, leaving less memory available for processing. If a single executor gets overloaded( this may happen when you have skewed data), it may run out of memory despite the cluster having sufficient total memory.&lt;/LI&gt;&lt;/UL&gt;</description>
      <pubDate>Sat, 25 Jan 2025 06:06:10 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106979#M42668</guid>
      <dc:creator>Avinash_Narala</dc:creator>
      <dc:date>2025-01-25T06:06:10Z</dc:date>
    </item>
    <item>
      <title>Re: Out of Memory after adding distinct operation</title>
      <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106989#M42669</link>
      <description>&lt;P&gt;much appreciate. Lastly before closing, is not overhead memory outside of&amp;nbsp;&lt;SPAN&gt;spark.executor.memory (7.6GB) ? If&amp;nbsp;spark.executor.memory is 7.6GB, is this dedicatedly for storage/execution with some reserved? Because 16 GB is total memory per machine, out of this 7.6GB is&amp;nbsp;spark.executor.memory. So difference (16-7.6GB) would be for off heap.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Sat, 25 Jan 2025 08:06:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106989#M42669</guid>
      <dc:creator>Klusener</dc:creator>
      <dc:date>2025-01-25T08:06:16Z</dc:date>
    </item>
    <item>
      <title>Re: Out of Memory after adding distinct operation</title>
      <link>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106990#M42670</link>
      <description>&lt;P&gt;&lt;STRONG&gt;overhead memory&lt;/STRONG&gt; (used for things like off-heap storage and shuffle operations) is separate from spark.executor.memory.&lt;/P&gt;&lt;P&gt;Let's break this down clearly:&lt;/P&gt;&lt;H3&gt;&lt;STRONG&gt;1. Memory Breakdown in Spark Executors&lt;/STRONG&gt;&lt;/H3&gt;&lt;P&gt;Each executor's total memory consists of the following components:&lt;/P&gt;&lt;H4&gt;&lt;STRONG&gt;a) Spark Executor Memory (spark.executor.memory)&lt;/STRONG&gt;:&lt;/H4&gt;&lt;H4&gt;&lt;STRONG&gt;b) Spark Memory Overhead (spark.executor.memoryOverhead)&lt;/STRONG&gt;:&lt;/H4&gt;&lt;H4&gt;&lt;FONT face="andale mono,times" color="#FF6600"&gt;Default Value=max(384MB, 0.1 * spark.executor.memory).&lt;/FONT&gt;&lt;/H4&gt;&lt;H4&gt;Configurable using spark.executor.memoryOverhead.&lt;/H4&gt;&lt;H4&gt;&lt;STRONG&gt;c) Total Executor Memory&lt;/STRONG&gt;:&lt;/H4&gt;&lt;P&gt;The total memory allocated per executor is the sum of the two:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;Total Executor Memory = spark.executor.memory + spark.executor.memoryOverhead&lt;/DIV&gt;&lt;/DIV&gt;&lt;HR /&gt;&lt;H3&gt;&lt;STRONG&gt;2. Cluster Memory Configuration in Your Case&lt;/STRONG&gt;&lt;/H3&gt;&lt;P&gt;Given the setup you described:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;STRONG&gt;Machine Memory&lt;/STRONG&gt;: 16GB per worker node.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Spark Executor Memory&lt;/STRONG&gt;: 7.6GB per executor (spark.executor.memory).&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Available for Overhead&lt;/STRONG&gt;: The remaining memory on the machine after accounting for spark.executor.memory.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Let’s calculate the breakdown:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;STRONG&gt;Executor JVM Memory&lt;/STRONG&gt;: 7.6GB is reserved for spark.executor.memory.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Overhead Memory&lt;/STRONG&gt;:&lt;UL&gt;&lt;LI&gt;By default, spark.executor.memoryOverhead is max(384MB, 0.1 * spark.executor.memory), i.e., max(384MB, 0.76GB) = 0.76GB in your case.&lt;/LI&gt;&lt;LI&gt;This leaves 16GB - (7.6GB + 0.76GB) = ~7.64GB for the OS, YARN, or other processes&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Mark it as solution if this helps.&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;Avinash N&lt;/P&gt;</description>
      <pubDate>Sat, 25 Jan 2025 08:26:36 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/out-of-memory-after-adding-distinct-operation/m-p/106990#M42670</guid>
      <dc:creator>Avinash_Narala</dc:creator>
      <dc:date>2025-01-25T08:26:36Z</dc:date>
    </item>
  </channel>
</rss>

