<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: PySpark Structured Streaming job doesn't unpersist DataFrames in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/pyspark-structured-streaming-job-doesn-t-unpersist-dataframes/m-p/117160#M45441</link>
    <description>&lt;DIV class="paragraph"&gt;The issue you’re encountering—where &lt;CODE&gt;unpersist()&lt;/CODE&gt; does not seem to release memory for persisted DataFrames in your Structured Streaming job—likely relates to nuances of the Spark caching mechanism and how it interacts with the lifecycle of micro-batch execution inside the &lt;CODE&gt;foreachBatch&lt;/CODE&gt; function.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Here are some considerations to address this issue:&lt;/DIV&gt;
&lt;OL start="1"&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Proper Use of &lt;CODE&gt;unpersist&lt;/CODE&gt;:&lt;/STRONG&gt; Ensure that &lt;CODE&gt;unpersist()&lt;/CODE&gt; is explicitly called at the right time after the necessary operations on the persisted DataFrame are completed. Sometimes, failure to call &lt;CODE&gt;unpersist&lt;/CODE&gt; immediately after usage can result in memory buildup. Try setting "blocking=true", e.g.&amp;nbsp;df.unpersist(blocking=True) # Force immediate eviction&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Batch-Specific Variables:&lt;/STRONG&gt; Assign unique names to DataFrame variables created in each batch to avoid reference caching issues, as Spark may hold onto references if the variables are reused across batches.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Inspect Executor and Driver Logs:&lt;/STRONG&gt; Check the executor and driver logs for any warnings or errors related to memory usage, as they may provide clues about incomplete or delayed cleanup.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Evaluate Cache Scope:&lt;/STRONG&gt; Inside &lt;CODE&gt;foreachBatch&lt;/CODE&gt;, the persistence exists within the scope of that batch execution. However, improper handling of references can cause persistence spill over to subsequent batches, leading to the progressive slowdown you’ve observed.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Memory Cleanup Check:&lt;/STRONG&gt; Once the &lt;CODE&gt;unpersist()&lt;/CODE&gt; is called, verify that the Storage tab in the Spark UI reflects the memory cleanup. There might be a delay in the UI reflecting changes.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;DataFrame References and Deserialization:&lt;/STRONG&gt; If DataFrames or persisted items in &lt;CODE&gt;foreachBatch&lt;/CODE&gt; are being deserialized on the executors, evaluate potential situations where references linger beyond their intended scopes, consuming additional memory. This behavior can occur especially in scenarios that involve distributed operations.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Driver Heap Dumps:&lt;/STRONG&gt; Although your heap dump files are empty, it might help to analyze logs or further refine configurations for enabling heap dumps to identic reproducible patterns.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;Known Memory Leak in Persistenc:&lt;/STRONG&gt; There’s an open bug in Spark where persisted DataFrames may not fully release memory even after `unpersist()`, particularly noticeable in long-running streams. See more information &lt;A href="https://community.databricks.com/t5/data-engineering/oom-issue-in-streaming-with-foreachbatch/td-p/71493" target="_self"&gt;here&lt;/A&gt;.&lt;/LI&gt;
&lt;/OL&gt;
&lt;H4&gt;Additional Recommendations:&lt;/H4&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Streaming Query Listener:&lt;/STRONG&gt; Consider enabling a streaming query listener (available in PySpark starting with DBR 11.0) to monitor memory utilization and track the lifecycle events of your streaming execution plan.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Project Lightspeed Features:&lt;/STRONG&gt; Structured Streaming in Databricks Runtime versions 13.1 and above has optimizations for stateful pipelines and adaptive execution which may alleviate some of the runtime coordination issues observed in operations like &lt;CODE&gt;foreachBatch&lt;/CODE&gt;.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;DIV class="paragraph"&gt;Finally, carefully review your pattern of usage to ensure the streaming query is optimized for long-running tasks. If challenges persist even after the above changes, consulting with Databricks support with a reproducible job configuration may help you uncover deeper platform-specific causes.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;These are some guides and ideas you can consider albeit not formal support.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Louis.&lt;/DIV&gt;</description>
    <pubDate>Wed, 30 Apr 2025 13:51:24 GMT</pubDate>
    <dc:creator>Louis_Frolio</dc:creator>
    <dc:date>2025-04-30T13:51:24Z</dc:date>
    <item>
      <title>PySpark Structured Streaming job doesn't unpersist DataFrames</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-structured-streaming-job-doesn-t-unpersist-dataframes/m-p/117151#M45437</link>
      <description>&lt;P&gt;Hi community,&lt;/P&gt;&lt;P&gt;I am currently developing a pyspark job (running on runtime 14.3 LTS) using structured streaming.&lt;/P&gt;&lt;P&gt;Our streaming job uses forEachBatch , and inside it we are calling persist (and subsequent unpersist) on two DataFrames. We are noticing from the Storage tab in the Spark UI that these DataFrames are not really unpersisted, and we see at the beginning of each batch that we persist two new DataFrames.&lt;/P&gt;&lt;P&gt;When this job runs, we experience a progressive slowdown of batch processing times, despite batch sizes remaining constant. As expected, after some time (roughly 8 hours) our driver OOMs and the job is restarted.&lt;/P&gt;&lt;P&gt;We are struggling to identify what is causing this behavior. We have also enabled driver heap dumps to check if we were causing memory leaks, but these files are saved empty.&lt;/P&gt;&lt;PRE&gt;spark.driver.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dbfs/heapDumps&lt;/PRE&gt;&lt;P&gt;Does anyone has some suggestions on how to identify what is causing this issue, and/or solve it?&amp;nbsp;&lt;/P&gt;&lt;H2 id="user-content-solutions-tried-so-far"&gt;Solutions tried so far&lt;/H2&gt;&lt;H3 id="user-content-1.-unpersist"&gt;1. Unpersist&lt;/H3&gt;&lt;P&gt;Not working&lt;/P&gt;&lt;H3 id="user-content-2.-unpersist-with-blocking-set-to-%60true%60"&gt;2. Unpersist with blocking set to true&lt;/H3&gt;&lt;P&gt;Not working&lt;/P&gt;&lt;H3 id="user-content-3.-using-catalog-api-%60cachetable%60-instead-of-%60persist%60"&gt;3. Using catalog API cacheTable instead of persist&lt;/H3&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;PRE&gt;df.createOrReplaceTempView(&lt;SPAN class=""&gt;'view'&lt;/SPAN&gt;)
spark.catalog.cacheTable(&lt;SPAN class=""&gt;'view'&lt;/SPAN&gt;)&lt;/PRE&gt;&lt;/DIV&gt;&lt;P&gt;It fails, spark.catalog.cacheTable does not see the temporary view , despite we are in the same SparkSession&lt;/P&gt;&lt;H3 id="user-content-4.-using-catalog-api-%60cachetable%60-with-a-%60global-temporary-view%60"&gt;4. Using catalog API cacheTable with a global temporary view&lt;/H3&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;PRE&gt;df.createOrReplaceGlobalTempView(&lt;SPAN class=""&gt;"global_temp.view"&lt;/SPAN&gt;)
spark.catalog.cacheTable(&lt;SPAN class=""&gt;'global_temp.view'&lt;/SPAN&gt;)&lt;/PRE&gt;&lt;/DIV&gt;&lt;P&gt;The table is cached, but the subsequent uncacheTable has no effect and the temporary view remains in the storage table Spark UI:&lt;/P&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;PRE&gt;spark.catalog.uncacheTable(&lt;SPAN class=""&gt;"global_temp.view"&lt;/SPAN&gt;)
df.dropGlobalTempView(&lt;SPAN class=""&gt;"global_temp.view"&lt;/SPAN&gt;)
df.unpersist()&lt;/PRE&gt;&lt;H3 id="user-content-5.-using-low-level-rdd-api"&gt;5. Using low-level RDD API&lt;/H3&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;PRE&gt;&lt;SPAN class=""&gt;&lt;SPAN class=""&gt;def&lt;/SPAN&gt; &lt;SPAN class=""&gt;_force_unpersist&lt;/SPAN&gt;&lt;SPAN class=""&gt;(self, df: DataFrame, df_name: str)&lt;/SPAN&gt; -&amp;gt; &lt;SPAN class=""&gt;None&lt;/SPAN&gt;:&lt;/SPAN&gt;
	&lt;SPAN class=""&gt;for&lt;/SPAN&gt; (id, rdd) &lt;SPAN class=""&gt;in&lt;/SPAN&gt; self.spark.sparkContext._jsc.getPersistentRDDs().items():
		&lt;SPAN class=""&gt;if&lt;/SPAN&gt; df_name &lt;SPAN class=""&gt;in&lt;/SPAN&gt; rdd.name():
			rdd.unpersist()
			&lt;SPAN class=""&gt;break&lt;/SPAN&gt;

self._force_unpersist(df, &lt;SPAN class=""&gt;"global_temp.view"&lt;/SPAN&gt;)&lt;/PRE&gt;&lt;/DIV&gt;&lt;P&gt;The RDD entry is not visible anymore in the Storage section of the spark ui, but the memory leak symptoms remain.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 30 Apr 2025 12:34:08 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-structured-streaming-job-doesn-t-unpersist-dataframes/m-p/117151#M45437</guid>
      <dc:creator>AlessandroM</dc:creator>
      <dc:date>2025-04-30T12:34:08Z</dc:date>
    </item>
    <item>
      <title>Re: PySpark Structured Streaming job doesn't unpersist DataFrames</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-structured-streaming-job-doesn-t-unpersist-dataframes/m-p/117160#M45441</link>
      <description>&lt;DIV class="paragraph"&gt;The issue you’re encountering—where &lt;CODE&gt;unpersist()&lt;/CODE&gt; does not seem to release memory for persisted DataFrames in your Structured Streaming job—likely relates to nuances of the Spark caching mechanism and how it interacts with the lifecycle of micro-batch execution inside the &lt;CODE&gt;foreachBatch&lt;/CODE&gt; function.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Here are some considerations to address this issue:&lt;/DIV&gt;
&lt;OL start="1"&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Proper Use of &lt;CODE&gt;unpersist&lt;/CODE&gt;:&lt;/STRONG&gt; Ensure that &lt;CODE&gt;unpersist()&lt;/CODE&gt; is explicitly called at the right time after the necessary operations on the persisted DataFrame are completed. Sometimes, failure to call &lt;CODE&gt;unpersist&lt;/CODE&gt; immediately after usage can result in memory buildup. Try setting "blocking=true", e.g.&amp;nbsp;df.unpersist(blocking=True) # Force immediate eviction&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Batch-Specific Variables:&lt;/STRONG&gt; Assign unique names to DataFrame variables created in each batch to avoid reference caching issues, as Spark may hold onto references if the variables are reused across batches.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Inspect Executor and Driver Logs:&lt;/STRONG&gt; Check the executor and driver logs for any warnings or errors related to memory usage, as they may provide clues about incomplete or delayed cleanup.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Evaluate Cache Scope:&lt;/STRONG&gt; Inside &lt;CODE&gt;foreachBatch&lt;/CODE&gt;, the persistence exists within the scope of that batch execution. However, improper handling of references can cause persistence spill over to subsequent batches, leading to the progressive slowdown you’ve observed.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Memory Cleanup Check:&lt;/STRONG&gt; Once the &lt;CODE&gt;unpersist()&lt;/CODE&gt; is called, verify that the Storage tab in the Spark UI reflects the memory cleanup. There might be a delay in the UI reflecting changes.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;DataFrame References and Deserialization:&lt;/STRONG&gt; If DataFrames or persisted items in &lt;CODE&gt;foreachBatch&lt;/CODE&gt; are being deserialized on the executors, evaluate potential situations where references linger beyond their intended scopes, consuming additional memory. This behavior can occur especially in scenarios that involve distributed operations.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Driver Heap Dumps:&lt;/STRONG&gt; Although your heap dump files are empty, it might help to analyze logs or further refine configurations for enabling heap dumps to identic reproducible patterns.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;Known Memory Leak in Persistenc:&lt;/STRONG&gt; There’s an open bug in Spark where persisted DataFrames may not fully release memory even after `unpersist()`, particularly noticeable in long-running streams. See more information &lt;A href="https://community.databricks.com/t5/data-engineering/oom-issue-in-streaming-with-foreachbatch/td-p/71493" target="_self"&gt;here&lt;/A&gt;.&lt;/LI&gt;
&lt;/OL&gt;
&lt;H4&gt;Additional Recommendations:&lt;/H4&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Streaming Query Listener:&lt;/STRONG&gt; Consider enabling a streaming query listener (available in PySpark starting with DBR 11.0) to monitor memory utilization and track the lifecycle events of your streaming execution plan.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Project Lightspeed Features:&lt;/STRONG&gt; Structured Streaming in Databricks Runtime versions 13.1 and above has optimizations for stateful pipelines and adaptive execution which may alleviate some of the runtime coordination issues observed in operations like &lt;CODE&gt;foreachBatch&lt;/CODE&gt;.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;DIV class="paragraph"&gt;Finally, carefully review your pattern of usage to ensure the streaming query is optimized for long-running tasks. If challenges persist even after the above changes, consulting with Databricks support with a reproducible job configuration may help you uncover deeper platform-specific causes.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;These are some guides and ideas you can consider albeit not formal support.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Louis.&lt;/DIV&gt;</description>
      <pubDate>Wed, 30 Apr 2025 13:51:24 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-structured-streaming-job-doesn-t-unpersist-dataframes/m-p/117160#M45441</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-04-30T13:51:24Z</dc:date>
    </item>
  </channel>
</rss>

