cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

OOM Issue in Streaming with foreachBatch()

dzsuzs
New Contributor II
I have a stateless streaming application that uses foreachBatch. This function executes between 10-400 times each hour based on custom logic. 
 
The logic within foreachBatch includes: 
  1. collect() on very small DataFrames (a few megabytes) --> driver memory is more than 20GB so it shouldn't be an issue
  2. Caching DataFrames and then unpersisting them 
  3. Converting a single row to a DF
  4. Performing a cross join on a very small DataFrame 
  5. Various filtering operations 
  6. Writing the DataFrame to the target_table in append mode.
The driver memory usage gradually increases over a few days until it eventually hits a Driver Out of Memory (OOM) error.
 
  • When does Spark remove state from the driver metadata in a streaming application? Are there configurations to force more aggressive cleanup?
  • Could frequent calls to collect() on small DataFrames still cause driver OOM issues? What alternatives can I use?
  • Should I avoid caching DataFrames even if they are used multiple times within a microbatch? How can I optimize the caching strategy?
  • Are there specific configurations or practices to better manage driver metadata and prevent memory bloat?
The goal is to find a solution to manage and optimize driver memory usage effectively.
 
I look forward to your suggestions and insights on resolving this issue.
3 REPLIES 3

xorbix_rshiva
Contributor

From the information you provided, your issue might be resolved by setting a watermark on the streaming dataframe. The purpose of watermarks is to set a maximum time for records to be retained in state. Without a watermark, records in your state will accumulate in memory, eventually resulting in an OOM error. Additionally, your job could have other performance hits as state accumulates over time.

In your case, assuming it's not necessary to retain all records in state over the lifetime of the job, you should set a reasonable window for records to be removed from state. For example, you could apply a 10 minute watermark like this:

`df.withWatermark("event_time", "10 minutes")`

Please refer to this Databricks documentation article on watermarks, including code examples: https://docs.databricks.com/en/structured-streaming/watermarks.html

@xorbix_rshiva thanks for the reply! The streaming app does not keep state (foreachbatch), so watermark is unfortunately irrelevant and is not the solution here.

gardnmi1983
New Contributor II

Did you ever figure out what is causing the memory leak?  We are experiencing a nearly identical issue where the memory gradually increases over time and OOM after a few days.  

I did track down this open bug ticket that states there is a memory leak when a dataset is persisted even if it is unpersisted.

https://issues.apache.org/jira/browse/SPARK-35262

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now