cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

PySpark Structured Streaming job doesn't unpersist DataFrames

AlessandroM
New Contributor

Hi community,

I am currently developing a pyspark job (running on runtime 14.3 LTS) using structured streaming.

Our streaming job uses forEachBatch , and inside it we are calling persist (and subsequent unpersist) on two DataFrames. We are noticing from the Storage tab in the Spark UI that these DataFrames are not really unpersisted, and we see at the beginning of each batch that we persist two new DataFrames.

When this job runs, we experience a progressive slowdown of batch processing times, despite batch sizes remaining constant. As expected, after some time (roughly 8 hours) our driver OOMs and the job is restarted.

We are struggling to identify what is causing this behavior. We have also enabled driver heap dumps to check if we were causing memory leaks, but these files are saved empty.

spark.driver.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dbfs/heapDumps

Does anyone has some suggestions on how to identify what is causing this issue, and/or solve it? 

Solutions tried so far

1. Unpersist

Not working

2. Unpersist with blocking set to true

Not working

3. Using catalog API cacheTable instead of persist

 
df.createOrReplaceTempView('view')
spark.catalog.cacheTable('view')

It fails, spark.catalog.cacheTable does not see the temporary view , despite we are in the same SparkSession

4. Using catalog API cacheTable with a global temporary view

 
df.createOrReplaceGlobalTempView("global_temp.view")
spark.catalog.cacheTable('global_temp.view')

The table is cached, but the subsequent uncacheTable has no effect and the temporary view remains in the storage table Spark UI:

 
spark.catalog.uncacheTable("global_temp.view")
df.dropGlobalTempView("global_temp.view")
df.unpersist()

5. Using low-level RDD API

 
def _force_unpersist(self, df: DataFrame, df_name: str) -> None:
	for (id, rdd) in self.spark.sparkContext._jsc.getPersistentRDDs().items():
		if df_name in rdd.name():
			rdd.unpersist()
			break

self._force_unpersist(df, "global_temp.view")

The RDD entry is not visible anymore in the Storage section of the spark ui, but the memory leak symptoms remain.

 

1 REPLY 1

BigRoux
Databricks Employee
Databricks Employee
The issue you’re encountering—where unpersist() does not seem to release memory for persisted DataFrames in your Structured Streaming job—likely relates to nuances of the Spark caching mechanism and how it interacts with the lifecycle of micro-batch execution inside the foreachBatch function.
Here are some considerations to address this issue:
  1. Proper Use of unpersist: Ensure that unpersist() is explicitly called at the right time after the necessary operations on the persisted DataFrame are completed. Sometimes, failure to call unpersist immediately after usage can result in memory buildup. Try setting "blocking=true", e.g. df.unpersist(blocking=True) # Force immediate eviction
  2. Batch-Specific Variables: Assign unique names to DataFrame variables created in each batch to avoid reference caching issues, as Spark may hold onto references if the variables are reused across batches.
  3. Inspect Executor and Driver Logs: Check the executor and driver logs for any warnings or errors related to memory usage, as they may provide clues about incomplete or delayed cleanup.
  4. Evaluate Cache Scope: Inside foreachBatch, the persistence exists within the scope of that batch execution. However, improper handling of references can cause persistence spill over to subsequent batches, leading to the progressive slowdown you’ve observed.
  5. Memory Cleanup Check: Once the unpersist() is called, verify that the Storage tab in the Spark UI reflects the memory cleanup. There might be a delay in the UI reflecting changes.
  6. DataFrame References and Deserialization: If DataFrames or persisted items in foreachBatch are being deserialized on the executors, evaluate potential situations where references linger beyond their intended scopes, consuming additional memory. This behavior can occur especially in scenarios that involve distributed operations.
  7. Driver Heap Dumps: Although your heap dump files are empty, it might help to analyze logs or further refine configurations for enabling heap dumps to identic reproducible patterns.
  8. Known Memory Leak in Persistenc: There’s an open bug in Spark where persisted DataFrames may not fully release memory even after `unpersist()`, particularly noticeable in long-running streams. See more information here.

Additional Recommendations:

  • Streaming Query Listener: Consider enabling a streaming query listener (available in PySpark starting with DBR 11.0) to monitor memory utilization and track the lifecycle events of your streaming execution plan.
  • Project Lightspeed Features: Structured Streaming in Databricks Runtime versions 13.1 and above has optimizations for stateful pipelines and adaptive execution which may alleviate some of the runtime coordination issues observed in operations like foreachBatch.
Finally, carefully review your pattern of usage to ensure the streaming query is optimized for long-running tasks. If challenges persist even after the above changes, consulting with Databricks support with a reproducible job configuration may help you uncover deeper platform-specific causes.
 
These are some guides and ideas you can consider albeit not formal support.
 
Louis.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now