cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
craig_lukasik
Databricks Employee
Databricks Employee

Apache Spark's Real-time Mode Use Case Deep Dive: Ad Attribution

This article is a companion to this Databricks blog  about an Ad Attribution use case, with a GitHub repo[b] including sample code that you can use to see Real-Time Mode (RTM) in action.

In this article, we go deep into the underlying architecture and mechanics of real-time mode (RTM) while reflecting on the Ad Attribution use case and the example code. First, we define "operational" use cases and explain how Apache Spark can now handle a new set of latency-sensitive use cases. Second, we review the architectural changes in Spark that enable RTM to achieve such low latency. Lastly, we explain some sample code, showing how, when coupled with transformWithState, ad attribution can be structured and plugged into a real-time stream. The sample code lets you compare the latency of RTM with that of the micro-batch execution mode.

Operational Workloads and Sub-Second Latency

Apache Spark™ and Operational Use Cases

Apache Spark Structured Streaming has long served as the industry standard for analytical streaming workloads, excelling at data ingestion and transformation, following architectures such as the Medallion pattern. However, a significant gap historically existed when addressing truly operational workloads—those systems requiring immediate response to data to trigger downstream actions or decisions, such as real-time fraud flagging or dynamic promotional messaging. These operational systems are characterized by a stringent requirement for sub-second, and often single-digit millisecond, end-to-end latency. Achieving this level of performance historically necessitated the use of specialized streaming engines outside the standard Spark/Databricks environment. This fragmentation compelled organizations to manage disparate data architectures, resulting in increased total cost of ownership (TCO) due to duplicated infrastructure, increased deployment complexity, resource contention, and the need to maintain multiple, distinct skill sets. The introduction of Apache Spark’s real-time mode, available in Databricks Runtime 16.4 LTS and above, directly addresses this issue. RTM delivers continuous, ultra-low-latency processing, pushing p99 latencies down to the single-digit milliseconds, effectively unifying the platform for both analytical and highly demanding operational streaming.

Architectural Trade-Offs: Micro-Batch vs. Continuous Execution vs. Real-Time Mode

The evolution of Structured Streaming execution models has been driven by the perpetual trade-off between latency, throughput, and stability.

Micro-Batch Execution (V1)

Micro-batching relies on the MicroBatchExecution engine, accumulating data into small, discrete batches. While this approach guarantees robust exactly-once processing and high throughput, its core limitation lies in the sequential nature of its execution loop. The execution phase cycles through distinct steps: fetching offsets, retrieving the batch data, planning the query, adding the batch, and finally, committing the write-ahead log (WAL). This sequential process, governed by a set trigger interval, introduces fixed scheduling latency that typically confines micro-batch performance to the seconds-latency range, making it unsuitable for true sub-second operational requirements.

Legacy Continuous Execution

Prior to RTM, Spark offered a "Continuous" execution mode, which processed records individually for lower latency. This experimental mode and didn't support a rich feature set, such as stateful queries.

Real-Time Mode (RTM)

RTM represents a fundamental architectural evolution that resolves the stability-vs-speed dilemma. RTM runs batches for longer durations (the default length is 5 minutes). Checkpoints, in this context, function similarly to batches. Critically, within this long batch window, RTM processes data as soon as it becomes available using concurrent scheduling. This design maintains the reliable, periodic checkpointing structure of a long-running batch while completely eliminating the scheduling coordination overhead of micro-batch mode. This hybridization strategy enables RTM to offer the robust fault tolerance associated with micro-batching, combined with sub-second latency performance.

Important: Cluster Configuration and Sizing

Successfully deploying RTM requires adhering to a precise set of cluster configurations and strict sizing methodologies. These parameters are not merely optimizations; they are the architectural keys that switch Spark’s behavior from micro-batching to continuous, concurrent processing.

Required Spark Configurations: Analyzing the Setup

The Ad Attribution workload demo provides the definitive configuration template necessary for RTM activation and high-performance operation:

  • Activating RTM: Setting spark.databricks.streaming.realTimeMode.enabled true is the fundamental flag that enables the streaming query engine to support RTM.

In Databricks, you can set the Spark confs in the Advanced section of the cluster configuration. Please note that this Spark conf is required for cluster bootup, unlike some other Spark confs that are session-scoped and changeable within runtime code.

The Task Slot Constraint: Ensuring Capacity Lockstep

The most critical operational constraint in RTM sizing is the task slot requirement. To run effectively in ultra-low-latency mode, the total number of available task slots in the compute cluster must be greater than or equal to the total number of tasks across all query stages.

This constraint is vital because the concurrent execution model requires every task in the query graph to have immediate access to dedicated compute resources. If the required number of slots is unavailable—for instance, if the workload generates 100 tasks but the cluster only provides 80 slots—an exception will be thrown that tells you the cluster does not have enough available slots.

This configuration necessitates a shift from reactive resource scaling (typical of batch ETL) to a proactive capacity reservation model. The cluster must be sized to meet the parallelism requirement of the peak load. Unlike micro-batch mode, where resources can be temporarily idled or shared, RTM tasks maintain their allocation within the long batch window, meaning precise right-sizing is crucial to prevent both resource exhaustion and unnecessary compute waste.

Tuning Data Parallelism: Optimizing Utilization

Since capacity is fixed and preemptively reserved, tuning the job’s parallelism is essential for optimizing resource utilization:

  • Source Parallelism (maxPartitions): For input systems like Kafka, which feeds the Ad Attribution pipeline, Databricks recommends setting maxPartitions, such that each task handles multiple Kafka partitions. This reduces task overhead and balances data flow across the allocated compute slots. maxPartitions is a new option in Databricks Kafka connection options that was introduced along with RTM.
  • Shuffle Parallelism: For shuffle-heavy jobs, such as those involving stateful operations (transformWithState) or complex stream-stream joins, the number of shuffle partitions must be carefully chosen. Engineers should experiment to find the minimum number of shuffle partitions that prevent data backlogs. This count directly impacts the total required task slots (Source tasks + Shuffle tasks).
  • Target Utilization: Tuning parallelism and overall cluster size often targets a goal utilization level (e.g., 50%). This buffer capacity ensures that the system can handle bursts of incoming data without violating the low-latency objective.

The physical capacity (task slots) and the logical parallelism (source and shuffle partitions) are inherently linked. If an increase in logical parallelism occurs without a corresponding increase in physical capacity, the RTM job cannot meet its concurrency requirement, resulting in performance degradation.

Production Best Practices

For scheduling RTM workloads, several operational best practices must be observed:

  1. Compute Type: Always schedule RTM streams as jobs using dedicated Jobs Compute. All-purpose compute clusters are not recommended for production streaming workloads.
  2. Scheduling Mode: Jobs should be scheduled using the conceptual "Continuous mode," which configures the job to run indefinitely and automatically restart upon failure.
  3. Autoscaling Prohibition: Due to the rigid task slot constraint required by RTM, autoscaling must be disabled. Scaling delays would destroy the low-latency guarantees.

Advanced Stateful Logic: Implementing Ad Attribution with transformWithState

RTM provides a high-performance execution environment, but complex operational problems, such as ad attribution, require sophisticated state management capabilities, which the transformWithState operator provides.

The Attribution Challenge: Correlating Disparate Real-Time Events

The Ad Attribution use case involves correlating two distinct, high-volume event streams: Ad Requests and Ad Impressions. These events arrive asynchronously, often out of order, and need to be stitched together across different time windows. Think of the "Request" event as an intention to show an advertisement at a given spot in a video or movie, and the "Impression" or ("ack") as the event that signifies that the ad is being shown to the viewer. Why is the "ack" needed? Primarily for these reasons:

  1. The ack payload may include additional intelligence (e.g., did the user click on or interact with the ad?)
  2. Ad sales are often conditioned on an ad being shown to a specified number of viewers. Once that commitment has been met, the media provider has more ad slots to sell!

For simplicity's sake, we are focusing on these two event types in the sample code. In the real world, there would be additional payload types to help with downstream attribution, such as associating a purchase with an ad impression. For our example, we focus on measuring impressions, as they are a key driver for managing ad inventory. Regardless, the technological challenge is maintaining the state of an active ad request, matching subsequent impressions and callbacks to that request based on complex logic (e.g., matching a request to an impression within 5 minutes, and then the impression to a click within 15 minutes), and ensuring accurate state expiration.

Standard Structured Streaming operators, such as built-in stream-stream joins, can handle some correlation, but they often lack the flexibility required for arbitrary, time-driven state cleanup and complex object modeling inherent in sophisticated event correlation (CEC).

Why transformWithState is Mandatory for Operational Correlation

The transformWithState operator was introduced to enhance Structured Streaming with flexible state management and event-driven programming. It enables developers to implement arbitrary logic that goes beyond the capabilities of built-in operators. For operational workloads like Ad Attribution, several features of transformWithState are mandatory:

  1. Object-Oriented State Management: The operator allows the custom StatefulProcessor implementation to define and store complex, custom state objects (such as tracking the full history of a Request-Impression match object) rather than relying solely on simple key-value mappings.
  2. Composite Data Types: Complex correlations often require composite keys (e.g., combining a User Session ID with a Campaign Identifier) to ensure that all related events are processed by the same state instance. transformWithState handles these composite types efficiently.
  3. Timer-Driven Logic: This is the most crucial feature for attribution. Timers allow the system to register a future action, such as cleaning up state exactly 15 minutes after an impression fires, even if no further event arrives. This timer-driven logic ensures timely state garbage collection, which is essential for maintaining memory bounds and guaranteeing accurate attribution boundary enforcement in a high-volume pipeline.

The application logic and the execution performance (RocksDB health) are thus intrinsically linked. If state is not aggressively expired, the total state size grows, increasing I/O demands during checkpointing and taxing the bounded memory manager, thereby compromising the RTM latency goals.

StatefulProcessor Implementation Deep Dive

The ad attribution architecture dictates a conceptual structure for the custom StatefulProcessor implementation. The input stream must first be grouped by a composite key (e.g., UserID and CampaignID) to guarantee co-location of related events at a single state instance. In the adtechdata_stream_processor_scala notebook, you will find this code that first groups the DataFrame by the transaction_id (think of this as an identifier for a viewing session) and then applies the transformWithState operator (LogRecordProcessor, a class that is defined in the same notebook):

// Group by transaction_id and apply stateful processing 
// using LogRecordProcessor
val transactionDF = sourceStream
  .as[InputRow]
  .groupByKey(_.transaction_id)
  .transformWithState(
    new LogRecordProcessor(timeout_duration = stateTimeoutInmin),
    TimeMode.ProcessingTime,
    OutputMode.Append
  )
  .toDF()

The state management involves defining a complex object that tracks the progress of the attribution attempt. LogRecordProcessor is an instance of StatefulProcessor, which is required for transformWithState. When we look at the signature for LogRecordProcessor, a number of schemas are involved:

class LogRecordProcessor(timeout_duration: Long) extends StatefulProcessor[String, InputRow, LogRecord] {
  1. String: The grouping key is a String. This will be the viewer's session identifier (in the source DataFrame, transaction_id)
  2. InputRow: This case class (also defined in the notebook) represents the schema of arriving data.
  3. LogRecord: The schema for outgoing records (emitted from the streaming query).

Event Handling Logic (Conceptual Flow)

  1. Ad Request Arrival: If no state exists for the key, the system initializes the state object, recording the request data and registering an Expiration Timer corresponding to the overall attribution window. In the sample code in the notebook, a timer is registered as rows for a grouping key arrive in the handleInputRows method. This helps keep the state data bounded over time.
  2. Ad Impression ("ack") Arrival: The state is updated, the impression is correlated with the tracked request, and the system may update or clean up state or emit output records.

The combination of RTM’s concurrent, low-latency execution and transformWithState’s precise, custom state control allows Spark to execute these complex, real-time correlation patterns accurately.

The stateful logic is encapsulated in the handleInputRows method of the LogRecordProcessor class, which extends the StatefulProcessor class, making it suitable for use with the grouped DataFrame's transformWithState method. Look in the adtechdata_stream_processor_scala notebook for this method:

 override def handleInputRows(
     key: String,
     inputRows: Iterator[InputRow],
     timerValues: TimerValues
 ): Iterator[LogRecord] = {
....

It is inside this method where complex, stateful logic resides. Two state variables (defined in the init method) are used:

  1. _ackLogRecordListState: "ack" records are tracked in a list. It is possible for the "request" message to arrive late. By keeping ack records in a "list state" structure, once a matching "request" message arrives, the matching elements can finally be emitted. And, remember, if the "request" never arrives, the timer that was set for the grouping key will ensure that the system does not grow state data in an unbounded manner.
  2. _requestLogRecordState: this "value state" state variable tracks the "request" message and uses it when the "ack" messages arrive to finally emit the matched records.

High-Performance State Management Internals

RTM's performance is dependent on the efficiency and stability of the underlying state store, particularly when running sophisticated stateful operators like transformWithState.

RocksDB Integration and Bounded Memory

The stateful Structured Streaming queries on Databricks rely upon the high-performance, embedded key-value store, RocksDB. RocksDB is optimized for flash storage and provides the I/O capacity required for high-volume, low-latency processing.

A key challenge in operational streaming is preventing unbounded memory growth. RTM pipelines handle immense state pressure, and if not managed correctly, state stores can consume all worker memory, leading to instability. To prevent this, RocksDB functionality is leveraged to enforce bounded memory usage.

Running the Demo: Comparing Microbatch to RTM

The run_adtech_rtm_demo notebook contains everything you need to run the demo, which compares a streaming ad attribution stateful streaming query with RTM mode against the traditional micro-batch mode. However, you will need to change some configurations to use Kafka (or another supported streaming source/sink). The code includes detailed comments, and we'll cover some highlights here.

Test Harness: AdTechStreamProcessor

import com.demo.data.{AdTechDataHelper}
import com.demo.data.{AdTechStreamProcessor}


// Get the active Spark session
implicit val spark = org.apache.spark.sql.SparkSession.active


// Initialize helper and processor classes for adtech data
val dataHelper = new AdTechDataHelper(kafka_options, payload, schema)
val streamProcessor = new AdTechStreamProcessor(kafka_options, schema)

AdTechDataHelper is defined in the adtechdata_producer_scala notebook and manages the generation of fake data for the stream.

AdTechStreamProcessor is defined in the adtechdata_stream_processor_scala notebook. There you will find the class used for the stateful transformWithState operator (LogRecordProcessor), along with the class (AdTechStreamProcessor) that helps with the benchmarking by establishing the streaming queries. What is striking is that, from a code perspective, using RTM is very simple. AdTechStreamProcessor's processStream method includes this code snippet:

   // Write the processed records to the output Kafka topic
    val query = outputDf.select($"transaction_id".cast("string").alias("key"),
      to_json($"value").alias("value"))
      .writeStream
      .format("kafka")
      .options(kafkaConfig)
      .option("topic", outputTopicName)
      .option("checkpointLocation", checkpointLocation)
      .trigger(
        if (mode == "trigger") Trigger.ProcessingTime("0 minutes")
        else Trigger.RealTime
      )
      .outputMode("update")
      .start()

The ".trigger" is the only code-related place where a developer needs to reason about using RTM or micro-batch mode. For the benchmark, the processStream method starts the queries and adds some timestamps used for measuring latency between the inbound and outbound Kafka streams.

To recap, to use the demo code:

  1. Start with the README notebook.
  2. Adjust Kafka connection details in the util/common notebook.
  3. Use the run_adtech_rtm_demo notebook to run the benchmark code.
  4. Explore the other notebooks in the util folder to explore the code responsible for generating the fake data stream (adtechdata_producer_scala) and for providing the stateful streaming query(adtechdata_stream_processor_scala).

Benchmark Results

The results of the benchmark will display in the run_adtech_rtm_demo notebook, like this:

craig_lukasik_0-1769790863056.png

The benchmark results, shown above, are from a run using DBR 18.0 with three worker nodes (Standard_F8):

{
    "cluster_name": "RTM_testing (compute optimized)",
    "spark_version": "18.0.x-scala2.13",
    "spark_conf": {
        "spark.databricks.streaming.realTimeMode.enabled": "true"
    },
    "azure_attributes": {
        "availability": "ON_DEMAND_AZURE"
    },
    "node_type_id": "Standard_F8",
    "driver_node_type_id": "Standard_E8_v3",
    "num_workers": 3,
}

Conclusions

Apache Spark’s real-time mode represents a significant architectural evolution, moving Structured Streaming from a platform primarily focused on analytical throughput to one capable of supporting low-latency operational decisioning. This capability is achieved through the fundamental shift to concurrent stage execution, facilitated by the ConcurrentStageDAGScheduler and the specialized MultiShuffleManager for in-memory data transfer.

For technical audiences and developers building these mission-critical systems, successful deployments with stable, predictable latency should consider the following:

  1. Configuration: Activating RTM requires a simple cluster-level configuration.
  2. Capacity Planning: The concurrency constraint—that total task slots must equal or exceed total tasks—prohibits autoscaling and demands proactive capacity reservation sized for peak parallelism. This requires continuous monitoring of input volumes to prevent violation of the sizing prerequisite.
  3. State Management Optimization: Implementing complex correlation logic, such as Ad Attribution, relies on transformWithState for custom timer-driven logic and timers. This application-level logic must be paired with infrastructure optimization, particularly the mandatory activation of RocksDB changelog checkpointing, to stabilize the p99 latency and prevent periodic I/O spikes during checkpoint events.

By enabling RTM and adopting the recommended best practices, RTM enables organizations to consolidate their data platforms, eliminating the need for fragmented, specialized streaming engines, while consistently delivering low-latency performance for the most demanding operational workloads.