I've seen a similar issue with large state using flatMapGroupsWithState. It is possible that A.) they are not using the state.setTimeout correctly or B.) they are not calling state.remove() when the stored state has timed out, leaving the state to grow in size.
def groupedStateFunc(key: String, records: Iterator[A], state: GroupState[B]): Iterator[C] = {
if (state.hasTimedOut) {
val result = state.getOption match { ... }
state.remove() // if this isn't called the state will leak
result
} else {
val results = state.getOption match {
case Some(lastState) => // merge new records and do something
case None => // first state - create initial StateStore record
}
state.update(processingFunc(records))
// if the timeout isn't set with an explicit point in the future (or if the value is too large) then records won't timeout and the state can grow
state.setTimeoutTimestamp(timestampWhenStateWillExpire)
Iterator.empty
}
}
If this isn't the case, then it could also be an issue with the representation of time. The StateStore will track the watermark metrics for each microbatch using timestamps (epoch milliseconds) to see the acceptable boundaries (min/avg/max of watermark thresholds). It is possible that investigating here can help as well.