cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
Giri-Patcham
Databricks Employee
Databricks Employee

Summary

  • Understand Spark's StateStore API: Learn how this core component enables robust stateful operations in Structured Streaming through versioned, key-value state management and checkpointing for fault tolerance.
  • Leverage RocksDB for Performance: Discover why the RocksDB-based StateStore provider, especially with changelog checkpointing, is recommended for lower latency and efficient state management in high-volume workloads.
  • Unlock State Inspection with the State Reader API: Explore the new stateStore data source, which allows direct Spark SQL access to checkpointed state data and transforms the StateStore from a "black box" into a queryable resource.
  • Improve Debugging and Monitoring: See how the State Reader API facilitates easier debugging of stateful logic, monitoring of state size and distribution, and analysis of application state.

Introduction

Apache Spark's Structured Streaming framework facilitates robust stateful stream processing, allowing applications to persist and update state across micro-batches. At the core of this capability lies the StateStore API, a fundamental component responsible for managing the state efficiently and ensuring fault tolerance. This article provides an in-depth analysis of the StateStore API, examining its underlying architecture, the evolution of its storage backends, mechanisms for accessing and inspecting state (including the powerful new State Reader API), methodologies for debugging, and strategies for optimizing stateful streaming workloads.

The Significance of Stateful Stream Processing

Stateful stream processing is indispensable in modern data-intensive applications, particularly in use cases involving real-time event aggregation (like counting events per user), sessionization (grouping user activity into sessions), stream-stream joins, and anomaly detection. The ability to maintain and update state across time windows or based on unique keys allows for more sophisticated and dynamic analytics, empowering applications to process data with greater contextual awareness. Spark's StateStore API provides a scalable and efficient framework for managing this state, leveraging a combination of in-memory caching and persistent storage techniques to ensure consistency, durability, and fault tolerance.

Understanding the StateStore in Spark Structured Streaming

Defining the StateStore

The StateStore is a critical internal abstraction within Spark Structured Streaming. It essentially provides a versioned key-value store tailored for the needs of stream processing. It maintains the state required for stateful operators like mapGroupsWithState, flatMapGroupsWithState, streaming aggregations, and streaming joins. Each task processing a specific state partition interacts with its local StateStore instance to read, update, or remove state values efficiently across streaming micro-batches.

This key-value store manages transactional consistency between micro-batches, enables atomic updates, and integrates tightly with Spark's checkpointing mechanism for fault recovery. Given its centrality, understanding its internals is crucial for building and troubleshooting scalable streaming applications.

How StateStore Works: Core Concepts

The StateStore manages the state based on versions, where each version corresponds to a processed micro-batch. Key architectural principles include:

  1. Stateful Operators: Operations like aggregations or mapGroupsWithState rely on the StateStore to retrieve the previous state for a given key, compute the new state based on incoming data, and commit the updated state.
  2. Partitioning: The state is partitioned, typically based on the keys being managed (e.g., grouping keys in an aggregation). Each Spark task processes specific state partitions, ensuring locality.
  3. Versioning and Checkpointing: This is the cornerstone of fault tolerance.
    • Each micro-batch completion results in a new version of the state.
    • The StateStore implementation commits changes for each version.
    • Crucially, these state changes are periodically checkpointed to reliable distributed storage (like HDFS, S3, and ADLS). Checkpointing involves saving the state information (either full snapshots or incremental changes/deltas) alongside commit logs.
    • If a failure occurs, Spark can restart the query from the last successful checkpoint, reloading the state from that point and reprocessing subsequent data, guaranteeing exactly once semantics (if the source supports it).
  4. State Management: Includes adding new key-value pairs, updating existing values, and removing state entries (e.g., for timed-out sessions using mapGroupsWithState timeouts or watermarks in aggregations).

Configurable Backends for State Storage: Performance Matters

Spark's StateStore allows plugging in different storage implementations (StateStoreProviders). The choice significantly impacts performance, latency, and recovery time:

  1. HDFS-based StateStore (Default - HDFSStateStoreProvider):
    • This was the original default implementation.
    • In the first stage, all the state data is stored in a memory map and then backed up by files in an HDFS-compatible file system.
    • Cons: It can suffer from large JVM GC pauses. As with HDFSBackedStateStore, the state data is maintained in the executors' JVM memory, and the large number of state objects puts memory pressure on the JVM, causing high GC pauses.
  2. RocksDB-based StateStore (RocksDBStateStoreProvider):
    • Recommended for most high-performance workloads.
    • Leverages RocksDB, an embedded key-value store optimized for fast storage (SSDs).
    • Efficient Memory Management: Rather than keeping the state in the JVM memory, this solution uses RocksDB to efficiently manage the state in the native memory and the local disk.
    • Changelog Checkpointing: Instead of writing full state snapshots frequently, this highly recommended approach checkpoints only the changes (deltas) made during a batch to the reliable distributed filesystem. This dramatically reduces the I/O needed for checkpointing, leading to faster batch completion times and less overhead. RocksDB handles merging these deltas into snapshots locally in the background.
    • Configuration:
      # Enable RocksDB provider
      spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
      
      # Recommended: Enable changelog checkpointing with RocksDB
      spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")


The Challenge: Inspecting the "Black Box"

While incredibly powerful, the StateStore historically acted like a "black box." Debugging stateful logic, understanding why a specific key has a certain value, monitoring state growth, or identifying state skew across partitions was challenging. Developers often had to rely solely on application logs or indirect metrics.

Introducing the State Reader API: Peeking Inside the StateStore

Recognizing these challenges, Spark introduced the State Reader API. This game-changing feature provides a standard Spark SQL DataSource interface to read the checkpointed data of a StateStore directly.

To understand its utility, let's consider a common stateful scenario: joining two streams, like orders and payments. Spark needs to maintain state for orders that haven't received a matching payment yet and vice versa within a specified time window.

Example Setup: Order and Payment Stream Join

First, let's set up two simulated streams using the rate source. One stream represents orders, and the other represents payments. We introduce some variations in keys and timestamps to mimic real-world scenarios where joins might not always succeed immediately or might fall outside defined time bounds.

orders_stream = (
    spark.readStream.format("rate")
    .option("rowsPerSecond", 2)
    .option("numPartitions", 1)
    .load()
    .withColumn("order_id", expr("CONCAT('ord-', CAST(value AS STRING))"))
    .withColumn("customer_id", expr("CONCAT('cust-', CAST(value % 5 AS STRING))"))
    .withColumn("amount", expr("RAND() * 100"))
    .withColumn("order_time", col("timestamp"))
)

payments_stream = (
    spark.readStream.format("rate")
    .option("rowsPerSecond", 2)
    .option("numPartitions", 1)
    .load()
    .withColumn("payment_id", expr("CONCAT('pmt-', CAST(value AS STRING))"))
    .withColumn(
        "order_id",
        expr(
            "CASE "
            + "WHEN value % 10 = 0 THEN CONCAT('missing-', CAST(value AS STRING)) "
            + "WHEN value % 10 = 1 THEN CONCAT('ord-', CAST(value + 5 AS STRING)) "  # No matching order
            + "WHEN value % 10 = 2 THEN CONCAT('ord-', CAST(value - 100 AS STRING)) "  # Will arrive before order
            + "ELSE CONCAT('ord-', CAST(value AS STRING)) "  # Order too old
            + "END"  # Normal case - should join
        ),
    )
    .withColumn("payment_amount", expr("RAND() * 100"))
    .withColumn(
        "payment_time",
        expr(
            "CASE "
            + "WHEN value % 10 = 3 THEN timestamp - INTERVAL 5 MINUTES "
            + "WHEN value % 10 = 4 THEN timestamp + INTERVAL 8 MINUTES "  # Payment too old (beyond watermark)
            + "ELSE timestamp "  # Payment too far in future (beyond join window)
            + "END"  # Normal timestamp
        ),
    )
)

Explanation: We create two DataFrames (orders_stream, payments_stream) reading from the rate source, which generates sequential numbers along with timestamps. We add columns like order_id, customer_id, amount, payment_id, etc., using Spark SQL expressions. Crucially, the payments_stream intentionally creates order_ids and payment_times that might not perfectly align with the orders_stream to simulate real-world data challenges for the join.

Next, we apply watermarks to both streams. Watermarking is essential for stream-stream joins as it allows Spark to define a threshold for how late data can arrive and still be considered for the join state. It also enables Spark to clear out the old state that is no longer needed.

# Apply watermarks
orders_with_watermark = orders_stream \
    .withWatermark("order_time", "2 minutes")

payments_with_watermark = payments_stream \
    .withWatermark("payment_time", "2 minutes")

Explanation: We define a 2-minute watermark on both order_time and payment_time. This means Spark will wait up to 2 minutes for late-arriving data before finalizing the state for a given time window.

Now, we perform the inner join. We join based on matching order_id and ensure the order_time and payment_time are within a 5-minute interval of each other.

joined = (
    orders_with_watermark.alias("order")
    .join(
        payments_with_watermark.alias("payment"),
        expr(
            """
            order.order_id = payment.order_id AND
            order_time >= payment_time - INTERVAL 5 MINUTES AND
            order_time <= payment_time + INTERVAL 5 MINUTES
        """),
        "inner",
    )
    .select(
        "order.*",
        "payment.payment_id",
        "payment.payment_amount",
        "payment.payment_time",
    )
)


Explanation: This join operation is inherently stateful. When an order arrives, Spark needs to store it in the StateStore while waiting for a potential matching payment within the time constraints. Similarly, when a payment arrives, it's stored while waiting for the corresponding order. The watermarks help Spark eventually clear out orders or payments that never found a match within the allowed time frame.

Finally, we write the joined stream to a Delta table. This action starts the streaming query and requires specifying a checkpoint location where Spark will manage the state information (including the join state).

# Write the joined stream to a Delta table (or any sink)

query = joined.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpointPath) \
    .queryName("OrderPaymentJoin") \
    .toTable(table_fqdn)

Explanation: The writeStream operation triggers the execution. The checkpointPath is critical – this is where Spark persists the StateStore data (using the configured provider, ideally RocksDB). The state for our join (unmatched orders and payments) resides within this checkpoint directory.

How the State Reader API Helps:

In this scenario, imagine you're debugging why certain orders aren't appearing in the output table.

  • Is the payment arriving too late (past the watermark)?
  • Is the payment arriving too early (before the order, but the order never shows up)?
  • Is there an issue with the order_id format?
  • Is the state growing unexpectedly large?

Without the State Reader API, answering these questions involves complex log analysis or custom metrics. With the State Reader API, you can directly query the state stored in the checkpointPath for the join operation to see exactly which orders and payments are being held in the state, inspect their timestamps, and understand why they haven't been joined or purged yet. This provides direct visibility into the internal workings of the stateful join operation.

Reading State Metadata

Before diving deep into the actual key-value state data, it's often useful to get a high-level overview of the state stores being used by your query. The state-metadata format provides exactly this.

# Read the state metadata from the checkpoint location

state_metadata_df = spark.read \
  .format("state-metadata") \
  .load(checkpoint_location)

display(state_metadata_df)

GiriPatcham_0-1747236739861.png

Explanation and Insights:

Running this code reads the metadata associated with the state stores within the specified checkpoint_location. The resulting DataFrame provides valuable information about each state store instance used by the streaming query's operators:

  • operatorId / operatorName: Identifies the Spark operator managing this specific state store. In our join example, you'll see symmetricHashJoin. This immediately confirms which parts of your query plan are stateful. Some developers might not realize that operations like stream-stream joins implicitly use the StateStore, and this metadata makes it explicit. In the example output, we see four state stores associated with operator ID 0 (symmetricHashJoin), handling state for both the left and right sides of the join.
  • stateStoreName: A more detailed name for the specific state being managed (e.g., left-keyToNumValues, right-keyWithIndexToValue for the join).
  • numPartitions: Shows the state store's parallelism. This typically corresponds to the number of shuffle partitions (spark.sql.shuffle.partitions, often defaulting to 200). If your keyspace is small or very large, the default parallelism might be inefficient, and this metadata can highlight that. Seeing 200 partitions here confirms that it's using the default setting.
  • minBatchId / maxBatchId: These indicate the range of micro-batches for which the state is currently being retained. The minBatchId is particularly important for state retention. If watermarks are working correctly and the old state is being purged, minBatchId should advance over time. If minBatchId remains stuck at a low number (like 0) for a long time, it could be a red flag that the state isn't being cleaned up properly (perhaps due to missing watermarks or issues in custom stateful logic like flatMapGroupsWithState). This can lead to the state growing unbounded, eventually causing performance degradation or failures.

By examining this metadata, you gain quick insights into how your query uses state, its parallelism, and whether state cleanup mechanisms appear to be functioning correctly, all before needing to inspect individual state entries.

Reading State Data

Once you have reviewed the metadata and identified a specific state store instance and batch you want to investigate, you can use the statestore format (note the difference from state-metadata) to read the actual key-value data.

First, let's determine the latest batch ID available from the metadata:

# get the max batch ID by reading the state metadata from the checkpoint location

batchId = spark.read.format("state-metadata") \
  .load(checkpointPath).first()["maxBatchId"]


Now, let's explore the state information for a specific storeName and batchId for the latest available batchId:

# Read the actual state data for a specific store and batch
# Options 'joinSide' OR 'storeName' are mandatory when reading state for joins.
# 'storeName' can be retrieved from the state-metadata output.
# 'joinSide' can be "left" or "right".

state_data_df = spark.read \
  .format("statestore") \
  .option("storeName", "left-keyWithIndexToValue") \
  .option("batchId", max_batch_id) \
  .load(checkpoint_location)

display(state_data_df)

GiriPatcham_1-1747236739839.png

Explanation and Insights:

This code reads the detailed state information using the statestore format. Key points:

  • format("statestore"): Specifies that we want the actual key-value data.
  • option("storeName", ...): This is crucial, especially for joins that use multiple internal state stores. You need to specify which store you want to inspect. The storeName (e.g., left-keyWithIndexToValue) comes directly from the state-metadata output we saw earlier. For simpler stateful operations like aggregation, you might not need these options if there's only one state store per operator.
  • option("batchId", ...): Specifies the exact version (micro-batch) of the state you want to see. Here, we use the max_batch_id we retrieved.
  • load(checkpoint_location): Points to the same checkpoint directory.

The resulting DataFrame (state_data_df) contains:

  • key: A struct representing the key used for state management. For our join example using left-keyWithIndexToValue, the key struct contains the join key (order_id).
  • value: A struct containing the actual data being stored for that key. In the join example, this would be the full row from the left side (orders stream) that is waiting for a match from the right side. You can see fields like timestamp, order_id, customer_id, amount, etc., within the value struct, exactly matching the data from the orders_stream.
  • partition_id: The internal partition number where this state key resides. Useful for diagnosing state skew issues (if too many keys land in the same partition).

This detailed view is invaluable for debugging. If an order ord-536 isn't showing up in the final output, you can use this query to check: Does ord-536 exist in the left-keyWithIndexToValue state store at the latest batch? If yes, it means the order arrived but is still waiting for its corresponding payment within the allowed time window. If it doesn't exist, it might have been purged by the watermark, or perhaps it never arrived or was filtered out earlier. This level of detail helps pinpoint exactly where the data flow is breaking down.

Debugging Example: Investigating Missing Orders

Let's put the statestore reader into practice for debugging. Suppose we query our final output table for specific orders and see some are there but others are missing:

# Query showing only ord-43 made it to the final table

ordersToVerify = ['ord-43','ord-574','ord-1000','ord-1001']

display(spark.read.table(table_fqdn).
        filter(col("order_id").
        isin(ordersToVerify))
)

GiriPatcham_2-1747236739582.png

Why did the other three orders not result in a joined output? The State Reader API can help us investigate if these orders are currently held in the join's state store (waiting for payment) or if they were dropped.

To get a more definitive answer, especially for orders found waiting in the state, we load the state from both sides of the join (orders and payments) and perform an outer join directly on the state DataFrames.

df_left = spark.read \
  .format("statestore") \
  .option("storename", "left-keyWithIndexToValue") \
  .option("batchId", batchId) \
  .load(checkpointPath) 

df_right = spark.read \
  .format("statestore") \
  .option("storename", "right-keyWithIndexToValue") \
  .option("batchId", batchId) \
  .load(checkpointPath) 

# Joining left and right state stores to find any mismatches

df_joined = df_left.join(
    df_right, df_left["key.field0"] == df_right["key.field0"], "outer"
).select(
    df_left["key.field0"].alias("left_key"),
    df_left["value"].alias("left_value"),
    df_right["key.field0"].alias("right_key"),
    df_right["value"].alias("right_value"),
)

# Check value.matched and compare order_time with payment_time if needed
# Check if payment_time is less than 5 minutes of order_time

df_matched = df_joined.withColumn(
    'value_matched', 
    when(col('left_value.matched') == True, True)
    .when(
        col('left_value.matched') == False, 
        abs(expr('unix_timestamp(right_value.payment_time) - unix_timestamp(left_value.order_time)')) < 300
    ).otherwise(False)
)

Analyzing State Store Join Mismatches Directly

Diagnosis 1: If an order ID (e.g., ord-574) IS found in the state

Let's check for orders where both the order and payment exist in the state, but the timestamp condition failed.

  • This tells us the order did arrive in the orders_stream.
  • It is currently being held in the state store because Spark hasn't yet received a matching payment for ord-574 within the specified time interval (payment_time within 5 minutes of order_time).
  • It also hasn't been dropped by the watermark yet (meaning its order_time is still within the 2-minute watermark window relative to the stream's progress).
  • Conclusion: The order is likely still waiting for its payment to arrive or for the watermark to advance past its order_time.
    # Keys matched with right table but didn't stastify the join condition
    
    display(
        df_matched.filter("value_matched == False")
        .filter("left_value.matched == False")
        .select("left_key", "left_value", "right_key", "right_value")
        .filter(col("left_value.order_id").isin(ordersToVerify))
    )

GiriPatcham_3-1747236739863.png

Explanation: This query isolates rows where both order and payment records were present in the state store simultaneously, but their timestamps were too far apart (> 5 minutes) to satisfy the join condition. The output showing ord-574 confirms this is why it didn't appear in the final joined table up to this batch. It wasn't dropped by the watermark yet, but it couldn't join due to the time difference.

Diagnosis 2: If an order ID IS NOT found in the state
  • This indicates the order is not currently waiting in the join state. There are several possible reasons:
    • Never Arrived/Filtered: The order might not have been generated by the source stream, or it could have been filtered out before the join operation.
    • Watermark Purged: The order did arrive and was held in the state, but no matching payment arrived within the time window before the watermark advanced past the order's order_time + allowed lateness. Spark then correctly purged the old state to prevent it from growing indefinitely.
    • Joined Successfully (but filtered later): Less likely given the initial check, but theoretically possible if there were subsequent filtering steps after the join.
      # Keys exists in left table and didn't match with right table either right keys dropped due to watermark or didn't arrive at all
      
      display(
          df_matched.filter("right_key is null")
          .select("left_key", "left_value", "right_key", "right_value")
          .filter(col("left_value.order_id").isin(ordersToVerify))
      )

GiriPatcham_4-1747236739861.png

Explanation: This query isolates rows where the order (left_key) exists in the state, but the payment (right_key) is null for this batch. The output showing ord-1000 and ord-1001 confirms these orders arrived but are waiting for a payment that hasn't arrived or hasn't been processed into the state yet (or was perhaps dropped by the payment watermark earlier). The inner join failed for these because the matching payment was absent from the state store.

Debugging Summary:

This detailed walkthrough demonstrates the power of the State Reader API. By querying the state metadata and the state data itself, we precisely diagnosed why each missing order failed to join:

  • ord-574: Both order and payment were present in the state, but their timestamps were too far apart.
  • ord-1000 & ord-1001: The orders were present in the state, but their corresponding payments were missing from the state store at that time.

This level of introspection transforms debugging stateful streaming applications from guesswork based on logs to a data-driven analysis of the application's internal state.

Key Use Cases for the State Reader API:

  1. Debugging: Directly inspect the value associated with specific keys (like order_id in the join state) to understand application logic errors. Verify if state cleanup (timeouts/watermarks) is working as expected by checking if old keys are being removed.
  2. Monitoring & Analysis: Track the total number of state keys (e.g., unmatched orders/payments), analyze state size growth over time, identify potential state skew (uneven distribution of keys across partitions), and understand the distribution of state values.
  3. State Migration/Analysis: Potentially use the state data for offline analysis or seeding state in other systems (use with caution regarding compatibility).

Conclusion

The StateStore API constitutes a foundational component of Spark’s Structured Streaming paradigm, providing a robust mechanism for managing stateful computations. By comprehending its internal workings, leveraging advanced configuration options, and employing systematic debugging strategies, developers can construct scalable and efficient stateful streaming applications. As the field of stateful stream processing continues to evolve, adopting emerging best practices and optimizing StateStore configurations will be crucial in achieving high-performance real-time analytics.

For a more comprehensive exploration, refer to the official Spark documentation and the Databricks technical guides.

The complete code used in this blog is available here.

References