Hi community,
I am currently developing a pyspark job (running on runtime 14.3 LTS) using structured streaming.
Our streaming job uses forEachBatch , and inside it we are calling persist (and subsequent unpersist) on two DataFrames. We are noticing from the Storage tab in the Spark UI that these DataFrames are not really unpersisted, and we see at the beginning of each batch that we persist two new DataFrames.
When this job runs, we experience a progressive slowdown of batch processing times, despite batch sizes remaining constant. As expected, after some time (roughly 8 hours) our driver OOMs and the job is restarted.
We are struggling to identify what is causing this behavior. We have also enabled driver heap dumps to check if we were causing memory leaks, but these files are saved empty.
spark.driver.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dbfs/heapDumps
Does anyone has some suggestions on how to identify what is causing this issue, and/or solve it?
Solutions tried so far
1. Unpersist
Not working
2. Unpersist with blocking set to true
Not working
3. Using catalog API cacheTable instead of persist
df.createOrReplaceTempView('view')
spark.catalog.cacheTable('view')
It fails, spark.catalog.cacheTable does not see the temporary view , despite we are in the same SparkSession
4. Using catalog API cacheTable with a global temporary view
df.createOrReplaceGlobalTempView("global_temp.view")
spark.catalog.cacheTable('global_temp.view')
The table is cached, but the subsequent uncacheTable has no effect and the temporary view remains in the storage table Spark UI:
spark.catalog.uncacheTable("global_temp.view")
df.dropGlobalTempView("global_temp.view")
df.unpersist()
5. Using low-level RDD API
def _force_unpersist(self, df: DataFrame, df_name: str) -> None:
for (id, rdd) in self.spark.sparkContext._jsc.getPersistentRDDs().items():
if df_name in rdd.name():
rdd.unpersist()
break
self._force_unpersist(df, "global_temp.view")
The RDD entry is not visible anymore in the Storage section of the spark ui, but the memory leak symptoms remain.