PySpark Structured Streaming job doesn't unpersist DataFrames
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-30-2025 05:34 AM
Hi community,
I am currently developing a pyspark job (running on runtime 14.3 LTS) using structured streaming.
Our streaming job uses forEachBatch , and inside it we are calling persist (and subsequent unpersist) on two DataFrames. We are noticing from the Storage tab in the Spark UI that these DataFrames are not really unpersisted, and we see at the beginning of each batch that we persist two new DataFrames.
When this job runs, we experience a progressive slowdown of batch processing times, despite batch sizes remaining constant. As expected, after some time (roughly 8 hours) our driver OOMs and the job is restarted.
We are struggling to identify what is causing this behavior. We have also enabled driver heap dumps to check if we were causing memory leaks, but these files are saved empty.
spark.driver.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dbfs/heapDumps
Does anyone has some suggestions on how to identify what is causing this issue, and/or solve it?
Solutions tried so far
1. Unpersist
Not working
2. Unpersist with blocking set to true
Not working
3. Using catalog API cacheTable instead of persist
df.createOrReplaceTempView('view') spark.catalog.cacheTable('view')
It fails, spark.catalog.cacheTable does not see the temporary view , despite we are in the same SparkSession
4. Using catalog API cacheTable with a global temporary view
df.createOrReplaceGlobalTempView("global_temp.view") spark.catalog.cacheTable('global_temp.view')
The table is cached, but the subsequent uncacheTable has no effect and the temporary view remains in the storage table Spark UI:
spark.catalog.uncacheTable("global_temp.view") df.dropGlobalTempView("global_temp.view") df.unpersist()
5. Using low-level RDD API
def _force_unpersist(self, df: DataFrame, df_name: str) -> None: for (id, rdd) in self.spark.sparkContext._jsc.getPersistentRDDs().items(): if df_name in rdd.name(): rdd.unpersist() break self._force_unpersist(df, "global_temp.view")
The RDD entry is not visible anymore in the Storage section of the spark ui, but the memory leak symptoms remain.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-30-2025 06:51 AM
unpersist() does not seem to release memory for persisted DataFrames in your Structured Streaming job—likely relates to nuances of the Spark caching mechanism and how it interacts with the lifecycle of micro-batch execution inside the foreachBatch function.-
Proper Use of
unpersist: Ensure thatunpersist()is explicitly called at the right time after the necessary operations on the persisted DataFrame are completed. Sometimes, failure to callunpersistimmediately after usage can result in memory buildup. Try setting "blocking=true", e.g. df.unpersist(blocking=True) # Force immediate eviction -
Batch-Specific Variables: Assign unique names to DataFrame variables created in each batch to avoid reference caching issues, as Spark may hold onto references if the variables are reused across batches.
-
Inspect Executor and Driver Logs: Check the executor and driver logs for any warnings or errors related to memory usage, as they may provide clues about incomplete or delayed cleanup.
-
Evaluate Cache Scope: Inside
foreachBatch, the persistence exists within the scope of that batch execution. However, improper handling of references can cause persistence spill over to subsequent batches, leading to the progressive slowdown you’ve observed. -
Memory Cleanup Check: Once the
unpersist()is called, verify that the Storage tab in the Spark UI reflects the memory cleanup. There might be a delay in the UI reflecting changes. -
DataFrame References and Deserialization: If DataFrames or persisted items in
foreachBatchare being deserialized on the executors, evaluate potential situations where references linger beyond their intended scopes, consuming additional memory. This behavior can occur especially in scenarios that involve distributed operations. -
Driver Heap Dumps: Although your heap dump files are empty, it might help to analyze logs or further refine configurations for enabling heap dumps to identic reproducible patterns.
- Known Memory Leak in Persistenc: There’s an open bug in Spark where persisted DataFrames may not fully release memory even after `unpersist()`, particularly noticeable in long-running streams. See more information here.
Additional Recommendations:
-
Streaming Query Listener: Consider enabling a streaming query listener (available in PySpark starting with DBR 11.0) to monitor memory utilization and track the lifecycle events of your streaming execution plan.
-
Project Lightspeed Features: Structured Streaming in Databricks Runtime versions 13.1 and above has optimizations for stateful pipelines and adaptive execution which may alleviate some of the runtime coordination issues observed in operations like
foreachBatch.