cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
MuraliTalluri
Databricks Employee
Databricks Employee

This article is a companion to this Databricks blog about a gaming sessionization use case, with a GitHub repo — a self-serve example you can import into your Databricks workspace and run end-to-end to see Real-Time Mode in action.

In this article, we deep dive into how we built a real-time gaming sessionization pipeline using Real-Time Mode (RTM) and transformWithState — tracking millions of active gaming device sessions and producing continuous heartbeats with sub-second precision. First, we walk through the StatefulProcessor implementation in detail — covering state management with MapState, the timer lifecycle that drives proactive heartbeat emission, and the key design decisions behind the pipeline. Second, we run the same pipeline in micro-batch mode and examine why end-to-end p99 latency climbs into the seconds range — then switch to Real-Time Mode and show how concurrent execution delivers sub-second p99 latency on the same code. Lastly, we cover monitoring with StreamingQueryListener, production considerations, and the benchmark results. The goal is to give you everything you need to implement this pattern yourself.

 

Use Case Overview and Pipeline Architecture

This companion blog covers the gaming sessionization use case in detail — why session tracking matters, the four scenarios the pipeline handles (session start, heartbeat, end, and timeout), and the end-to-end pipeline architecture. Here is the architecture diagram:

 

MuraliTalluri_0-1780091061871.png

 

Implementation Deep Dive

Reading from Kafka

The pipeline subscribes to two Kafka topics in a single read — pc_sessions for PC events and console_sessions for console events. Each topic carries JSON-encoded session events with the same logical schema, but with different device identifier fields: hostPcId for PC and openPsid for console. The stream reads from both topics, parses the JSON payload, and normalizes the device identifier into a unified deviceId:

val input_stream_df = spark.readStream
  .format("kafka")
  .option("subscribe", s"$pc_sessions_topic,$console_sessions_topic")
  ...
  .withColumnRenamed("timestamp", "kafka_timestamp")
  .withColumn("session", from_json(col("value"), kafka_schema))
  .withColumn(
    "deviceId",
    when(col("topic") === pc_sessions_topic, col("session.hostPcId"))
      .when(col("topic") === console_sessions_topic, col("session.openPsid"))
      .otherwise(lit(null))
  )
  .selectExpr(
    "topic", "partition", "offset", "kafka_timestamp", "deviceId",
    "session.psnAccountId", "session.appSessionId", "session.eventId",
    "session.timestamp as session_timestamp", "session.totalFgTime"
  )

A few things to note:

  • kafka_timestamp — the upstream Kafka log timestamp, preserved for calculating end-to-end latency.
  • selectExpr — the columns selected here map directly to InputRow, the typed case class used by the StatefulProcessor.

Implementing Stateful Sessionization with transformWithState

State Setup

The parsed input stream is grouped by deviceId and passed to transformWithState, which applies the Sessionization processor — a custom StatefulProcessor that manages the full session lifecycle for each device:

val processed_stream_df = input_stream_df.as[InputRow]
  .groupByKey(_.deviceId)
  .transformWithState(
    new Sessionization(),
    TimeMode.ProcessingTime,
    OutputMode.Update()
  )

groupByKey(_.deviceId) ensures all events for a given device are routed to the same processor instance — this is what makes per-device state possible. TimeMode.ProcessingTime means timers fire on wall-clock time, not event-time watermarks — the right choice when heartbeats should follow a real-world cadence regardless of data arrival patterns. OutputMode.Update emits only the rows produced by each processing cycle, not the full state.

MapState keyed by appSessionId tracks the active session per device. When a new SessionStart arrives for a device that already has an active session, the processor can look up and end the existing session before starting the new one.

Session Start

When a SessionStart event arrives for a device, the processor stores the session in MapState — each deviceId group maintains its own state instance, so sessions are tracked independently per device. The key piece is what happens next: the processor captures the current processing time and registers a timer 30 seconds into the future. This is the moment the pipeline transitions from reactive to proactive — from this point on, handleExpiredTimer() takes over.

// get current processing time and calculate 30 seconds after it
val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs()
val timerMillis = currentProcessingTimeMillis + TIMER_THRESHOLD_IN_MS

// Store session in state
sessionStatusState.updateValue(inputRow.appSessionId, sessionValue)

// Register the first timer for 30 seconds from now — kicks off the heartbeat loop
getHandle.registerTimer(timerMillis)

Heartbeat Loop 

When a timer for a device expires, it flows into handleExpiredTimer() as an expired timer instance. The processor gets the expired timer info, calculates the next timer 30 seconds out, computes the session duration, and emits a heartbeat with the session duration:

// Get the expired timer info
val expiredTimerMillis = expiredTimerInfo.getExpiryTimeInMs()
val expiredTimerTime = new Timestamp(expiredTimerMillis)

// Create a new timer for next 30 sec, current timer expiry time + 30 sec
val nextTimerMillis = expiredTimerMillis + TIMER_THRESHOLD_IN_MS
val nextTimerTime = new Timestamp(nextTimerMillis)

// Calculate session duration: current processing time - session start processing time
val currentSessionDuration: Long =
  (currentProcessingTimeMillis - sessionValue.processing_timestamp.getTime) / 1000

// Re-register timer for next 30 seconds
getHandle.registerTimer(nextTimerMillis)

getHandle.registerTimer(nextTimerMillis) is what keeps the heartbeat loop running — each timer re-registers the next one, continuing every 30 seconds until a SessionEnd event arrives or the session times out.

Session End

When a SessionEnd event arrives in handleInputRows(), the processor emits a SessionEnd output, removes the session from MapState, and deletes the timer for that device — stopping the heartbeat loop:

// Remove the session from state
sessionStatusState.removeKey(inputRow.appSessionId)

// Delete the timer for the device
val timerIter = getHandle.listTimers()
for (timer <- timerIter) getHandle.deleteTimer(timer)

If a new SessionStart arrives while a session is already active, the processor ends the existing session first — iterating the MapState, emitting a SessionEnd for the existing active session, and cleaning up timers — before starting the new one. This guarantees only one session is active per device at any time.

Session Timeout

If no SessionEnd event arrives and the session exceeds the maximum allowed duration (1800 seconds, for example), the timeout is handled inside handleExpiredTimer(). When the timer fires and the computed duration crosses the threshold, instead of emitting a heartbeat and re-registering the timer, the processor emits a SessionEnd, removes the session from state, and stops the heartbeat loop:

if (currentSessionDuration > SESSION_THRESHOLD_IN_SECONDS) {
  // Session exceeded max duration — emit SessionEnd and clean up
  outputRows.append(OutputRow(..., "SessionEnd", currentSessionDuration, ...))
  sessionStatusState.removeKey(sessionId)
  // No re-register — the heartbeat loop stops
}

This is the safety valve — it ensures the state doesn't grow unbounded for sessions that never receive an explicit end event.

 

Running the Pipeline in Micro-Batch Mode

To run this pipeline in micro-batch mode, we use Trigger.ProcessingTime("0.5 seconds"), which is the default trigger interval. The cluster used is 8 x m6gd.4xlarge (16 cores and 64 GB memory per worker), making a total of 128 executor cores, running on DBR 18.x.

Sequential Stage Execution Within a Micro-Batch

In micro-batch mode, each batch executes its stages sequentially. The pipeline has two stages: reading from Kafka and shuffling the input data by deviceId. The read stage reads from 32 Kafka input partitions (16 per topic), producing 32 tasks. The shuffle stage repartitions the data across 128 shuffle partitions for the transformWithState operator, producing 128 tasks. Within a single micro-batch, each stage must fully complete before the next one begins.

SparkUI showing sequential stage execution within a micro-batch:

MuraliTalluri_1-1780091107939.png

Sequential Batch Execution: Where Seconds Add Up

The bigger latency problem isn't within a single batch — it's across batches. Micro-batches execute sequentially: batch N must fully complete before batch N+1 begins.

If micro-batches N and N+1 each take 4 seconds to finish, a record that arrives after batch N starts won't be processed until batch N+1 — and won't be produced to the sink until 7-8 seconds from the time it arrived on the input topic, leading to an end-to-end latency spike. 

StreamingQueryListener output showing consecutive micro-batches taking 3-4 seconds each. Screenshot from SparkUI showing execution time for two consecutive batches:

MuraliTalluri_2-1780091107939.png

This is exactly what the benchmark confirms — micro-batch mode on this pipeline produces a p99 of ~8.5 seconds. The end-to-end latency is primarily driven by sequential batch execution.

Timer Precision in Micro-Batch Mode

Only timers that expired by the time the micro-batch started are fired (executing handleExpiredTimer() for that timer instance) — that is, only timers where expiredTimerInfo.getExpiryTimeInMs() <= micro-batch start time. If a timer expires in the middle of a micro-batch, it has to wait until the end of the next micro-batch to fire.  If a micro-batch has to process millions of expired timer instances, it will add latency to the records waiting to be processed in the next micro-batch.

 

Switching to Real-Time Mode

To switch the same pipeline to Real-Time Mode, the first change is the trigger. Instead of discrete micro-batches, you specify how often the stream should checkpoint. Here, 5 minutes tells the engine: keep processing continuously until you hit 5 minutes, then checkpoint and start again. Essentially, we are converting this to longer micro-batches with concurrent stage execution:

.trigger(
  if (mode == "RTM") RealTimeTrigger.apply("5 minutes")
  else Trigger.ProcessingTime("0.5 seconds")
)

Concurrent Stage Execution

The fundamental difference: in RTM, stages execute concurrently. The read stage and shuffle stage run in parallel — data flows continuously from Kafka through the transformWithState operator to the output topic without waiting for one stage to fully complete before the next begins. RTM uses streaming shuffle to facilitate this — enabling data to flow between stages continuously rather than being materialized and exchanged in full between sequential stages as in micro-batch mode. 

A record that lands on Kafka is read, shuffled, processed through the stateful operator, and written to the output topic in a single continuous flow. SparkUI showing concurrent stage execution in RTM:

MuraliTalluri_3-1780091107940.png

Dividing the Cluster Between Stages

Because both stages run concurrently, every task needs a dedicated core for the duration of the long batch. This means the cluster's cores must be divided between the read stage and the shuffle stage. The cluster has 128 executor cores (8 x m6gd.4xlarge, 16 cores each). The Kafka source has 32 input partitions (16 per topic). In RTM, we use maxPartitions (only available in RTM) to coalesce those 32 partitions into 16 source tasks — allocating 16 cores to the read stage:

spark.readStream
  .format("kafka")
  .option("subscribe", s"$pc_sessions_topic,$console_sessions_topic")
  .pipeIf(mode == "RTM")(_.option("maxPartitions", 16))
  ...

The remaining cores go to the shuffle stage:

if (mode == "RTM") {
  spark.conf.set("spark.sql.shuffle.partitions", "112")
} else {
  spark.conf.set("spark.sql.shuffle.partitions", "128")
}

The total task count exactly matches the available cores — if it exceeds, RTM will throw an exception.

Timer Precision in Real-Time Mode

Unlike micro-batch mode where timers are micro-batch bound, in RTM timers execute handleExpiredTimer() with millisecond precision — as soon as a timer expires, it fires immediately.

 

Where MBM Falls Short for This Use Case

The screenshots below show the actual output for a single device in MBM vs RTM. The business requirement is a heartbeat every 30 seconds — reporting that a session is alive and how long it's been running.

In RTM, heartbeats fire consistently every 30 seconds — 30s, 60s, 90s, 120s — exactly as the business expects:

MuraliTalluri_4-1780091107940.png

In MBM, the same device's heartbeats are irregular — 0s, then 50s, then 71s — because timers can only fire at micro-batch boundaries. The 30-second cadence the code registers is not the cadence the downstream consumer sees. 

MuraliTalluri_5-1780091107940.png

For use cases where consistent, predictable timer-driven output and consistent end-to-end latency matter — session tracking, IoT heartbeats, real-time alerting — this is the gap that Real-Time Mode closes.

 

Monitoring with StreamingQueryListener

To observe the pipeline's behavior, we attach a StreamingQueryListener implementation that logs key metrics on every progress event:

class CustomStreamingQueryListener extends StreamingQueryListener {
  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    val progress = event.progress
    val stateOperators = progress.stateOperators(0)
    val stateMetrics = stateOperators.customMetrics

    println(s"batchId=${progress.batchId} numInputRows=${progress.numInputRows} " +
      s"batchDuration=${progress.batchDuration}")
    println(s"numRowsTotal=${stateOperators.numRowsTotal} " +
      s"numRowsUpdated=${stateOperators.numRowsUpdated} " +
      s"numExpiredTimers=${stateMetrics.get("numExpiredTimers")} " +
      s"numRegisteredTimers=${stateMetrics.get("numRegisteredTimers")} " +
      s"rocksdbPutLatency=${stateMetrics.get("rocksdbPutLatency")} " +
      s"timerProcessingTimeMs=${stateMetrics.get("timerProcessingTimeMs")}")

    // RTM-only: latencies JSON
    if (mode == "RTM") {
      val progress_json = parse(progress.json)
      val latenciesJson = progress_json \ "latencies"
      println(pretty(render(latenciesJson)))
    }
  }
}

In MBM, these metrics are reported per micro-batch. In RTM, they are aggregated for the entire checkpoint interval (5 minutes).

In RTM mode, we get these additional built-in metrics with StreamingQueryProgress — reporting processing latency, source queuing latency, and end-to-end latency percentiles directly from the engine. These are not available in micro-batch mode.

 

Latency: RTM vs. Micro-Batch

Latency is measured end-to-end — from the input Kafka record timestamp to the output Kafka record timestamp. Both modes run on the same cluster (8 x m6gd.4xlarge, 128 executor cores, DBR 18.x) with the same data — ~500K input events per minute, ~4M active sessions, and ~8M heartbeat records produced per minute.

RTM delivers a 20x improvement on p99 latency — on the same code and the same cluster.

 

MuraliTalluri_6-1780091107940.png

Conclusion

This deep dive walked through the internals of a real-time gaming sessionization pipeline — from how MapState tracks sessions per device, to how handleInputRows() processes session starts and ends, to how handleExpiredTimer() drives the heartbeat loop, to why micro-batch execution creates a structural latency floor and how RTM eliminates it.

Key takeaways:

  • transformWithState handles both reactive and proactive processing in a single StatefulProcessor — input events and timer-driven output in one class, one codebase.
  • Micro-batch mode bounds both input processing and timer precision to batch boundaries — for timer-heavy workloads, this means irregular heartbeats and end-to-end latency spikes driven by sequential batch execution. This is the gap that Real-Time Mode closes.
  • RTM delivers consistent sub-second latency — 432ms p99 vs 8.5s p99 in micro-batch, on the same code and the same cluster.
  • Switching is a trigger change, not a rewrite — the same StatefulProcessor, the same state, the same Kafka topics.

The pattern isn't limited to gaming. Any workload that needs timer-driven output with consistent end-to-end latency — IoT heartbeats, equipment monitoring, session tracking, real-time alerting — can be built the same way.

Try it yourself: