cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

OOM Issue in Streaming with foreachBatch()

dzsuzs
New Contributor II
I have a stateless streaming application that uses foreachBatch. This function executes between 10-400 times each hour based on custom logic. 
 
The logic within foreachBatch includes: 
  1. collect() on very small DataFrames (a few megabytes) --> driver memory is more than 20GB so it shouldn't be an issue
  2. Caching DataFrames and then unpersisting them 
  3. Converting a single row to a DF
  4. Performing a cross join on a very small DataFrame 
  5. Various filtering operations 
  6. Writing the DataFrame to the target_table in append mode.
The driver memory usage gradually increases over a few days until it eventually hits a Driver Out of Memory (OOM) error.
 
  • When does Spark remove state from the driver metadata in a streaming application? Are there configurations to force more aggressive cleanup?
  • Could frequent calls to collect() on small DataFrames still cause driver OOM issues? What alternatives can I use?
  • Should I avoid caching DataFrames even if they are used multiple times within a microbatch? How can I optimize the caching strategy?
  • Are there specific configurations or practices to better manage driver metadata and prevent memory bloat?
The goal is to find a solution to manage and optimize driver memory usage effectively.
 
I look forward to your suggestions and insights on resolving this issue.
2 REPLIES 2

xorbix_rshiva
Contributor

From the information you provided, your issue might be resolved by setting a watermark on the streaming dataframe. The purpose of watermarks is to set a maximum time for records to be retained in state. Without a watermark, records in your state will accumulate in memory, eventually resulting in an OOM error. Additionally, your job could have other performance hits as state accumulates over time.

In your case, assuming it's not necessary to retain all records in state over the lifetime of the job, you should set a reasonable window for records to be removed from state. For example, you could apply a 10 minute watermark like this:

`df.withWatermark("event_time", "10 minutes")`

Please refer to this Databricks documentation article on watermarks, including code examples: https://docs.databricks.com/en/structured-streaming/watermarks.html

@xorbix_rshiva thanks for the reply! The streaming app does not keep state (foreachbatch), so watermark is unfortunately irrelevant and is not the solution here.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group