cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

The State in-stream is growing too large in stream

User16826994223
Honored Contributor III

I have a customer with a streaming pipeline from Kafka to Delta. They are leveraging RocksDB, watermarking for 30min and attempting to dropDuplicates. They are seeing their state grow to 6.2 billion rows--- on a stream that hits at maximum 7000 rows per second during peak traffic . The state has grown larger than the potential total rows in the 30min window at peak times (12.6 million max in 30min window). Does anyone have any ideas or has seen this behavior before?

1 REPLY 1

shaines
New Contributor II

I've seen a similar issue with large state using flatMapGroupsWithState. It is possible that A.) they are not using the state.setTimeout correctly or B.) they are not calling state.remove() when the stored state has timed out, leaving the state to grow in size.

def groupedStateFunc(key: String, records: Iterator[A], state: GroupState[B]): Iterator[C] = {
  if (state.hasTimedOut) {
    val result = state.getOption match { ... }
    state.remove() // if this isn't called the state will leak
    result
  } else {
     val results = state.getOption match {
       case Some(lastState) => // merge new records and do something
       case None => // first state - create initial StateStore record
     }
    state.update(processingFunc(records))
    // if the timeout isn't set with an explicit point in the future (or if the value is too large) then records won't timeout and the state can grow
    state.setTimeoutTimestamp(timestampWhenStateWillExpire)
    Iterator.empty
  }
}

If this isn't the case, then it could also be an issue with the representation of time. The StateStore will track the watermark metrics for each microbatch using timestamps (epoch milliseconds) to see the acceptable boundaries (min/avg/max of watermark thresholds). It is possible that investigating here can help as well.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group