cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
matthew_m
New Contributor III
New Contributor III

Learn to build fast, stateful pipelines for operational workloads. Discover stateless vs. stateful streams, how to setup your cluster and more. Get hands-on building a pipeline with code snippets and access to the repo. Start your real-time journey now!

Introduction

Apache Spark Structured Streaming is a scalable, fault-tolerant stream processing engine used in the Databricks Lakehouse Platform. There are two types of Structured Streaming streams: stateless and stateful. Stateful streaming maintains the context of previously processed data across batches or time intervals, ideal for handling streaming data with temporal dependencies, while stateless processes data records independently. 

When building Structured Streaming pipelines, you have to take into account whether your workload is analytical or operational as they have different architectures.

Analytical processing follows the Medallion architecture, where data is written to a persistent storage format like Delta. This architecture emphasizes the persistence of data, making it a reliable source of truth for analysis. Analytical processing focuses on performing in-depth analysis and computations on top of the data, enabling organizations to gain insights and make informed decisions based on the processed information.

matthew_m_0-1692646636186.png

Figure 1. Analytical Workloads (source)

On the other hand, operational processing is used to run critical parts of a business in near-real-time. Unlike analytical processing, operational processing emphasizes timely transformations and actions on the data. This architecture enables organizations to quickly process incoming data, make operational decisions, and trigger immediate actions based on the real-time insights derived from the data.

matthew_m_1-1692646795803.png

Figure 2. Operational Workloads (source)

In this guide, we will focus on how to build a time-critical operational Structured Streaming pipeline and the key design considerations that go into it.

Preparing the Environment

As an employee of an online retailer, your objective is to boost sales conversion on the site. To accomplish this, leadership has assigned you the responsibility of creating a near-real-time pipeline. This pipeline will take in page traffic data as input and produce information about the sites experiencing increased traffic as output. The ultimate goal is to enable real-time adjustments to various components of these pages, to drive up sales.

In pursuit of this objective, a comprehensive analysis of the site's traffic has been conducted. Following the analysis, the team has reached a consensus that the target will be sites that have accumulated over 5000 clicks within the past 5 seconds. The intention is to focus on these high-traffic sites and implement real-time strategies to increase their sales conversion rate.

The architecture devised incorporates both an analytical and operational processing pipeline, allowing us to accomplish the business goal of achieving near-real-time data processing while still preserving the source of truth for assessing historical trends. Below we can see what this architecture looks like.

matthew_m_2-1692646865763.png

Figure 3. Operational and Analytical Processing Architecture

For this guide, we will focus on the operational part of the architecture to demonstrate how we can achieve this by leveraging Spark Structured Streaming to achieve low latency results. To achieve this we will follow the steps below.

Setup

Step 1: Cluster Creation
We will begin by creating the cluster that will be used for our streaming pipeline. Streaming jobs are generally a compute/CPU-bound activity, this means that we should aim to choose a cluster config with a higher CPU count. When the state is expected to be very large we will also have to take into account memory in our clusters to make sure that the cluster can be held within the cluster, to avoid spills and with it, latency. Sizing is an exercise that depends on the source data volumes. The goal when choosing the cluster is to increase the parallelism with respect to the source. These are other considerations that need to be taken into account.

  • Autoscaling limitations: In Structured Streaming there is currently no good autoscaling management and workloads will load to max capacity. DLT comes with enhanced autoscaling to solve these limitations. For Structured Streaming, some customers use custom logic to manage cluster sizes. This can be achieved by checking batch durations and if it is above or below a threshold, call the resize API to increase or decrease the cluster size.
  • Compute Type: We recommend the following compute types depending on your cloud environment for stateful structured streaming.
    • AWS: m5 or i3, the latter for large joins or aggregations
    • Azure: DS_v2 or Ls, the latter for large joins or aggregations

Step 2:  Spark Config
To extract the best performance from Structured Streaming here are some Spark configurations for low latency performance. 

  • Shuffle partitions:  When reading from the source, choosing the number of partitions will allow for the best parallelization when running the streaming workload. Two things must be taken into account when choosing the partitions, this is an essential decision as at the time of writing repartitioning would require a new checkpoint so the state would be lost.
    • Number of Cores: The partitions should be proportional to the amount of cores available in the cluster.
    • Size of the State: The number of partitions must consider the expected amount of state stored so that the partition size is not too large. A rule of thumb is to keep each shuffle partition below 200MB.

 

spark.conf.set("spark.sql.shuffle.partitions", "64") #Proportionate to number of cores

 

  • State operators: These are where the state gets stored after being aggregated. It allows the pipeline to keep a context of what has come before without having to create a table for it. We currently recommend RocksDB over HDFS BackedStateStoreProvider as the latter stores data in the JVM which causes the GC and the risk of OOM to increase over time reducing performance and requiring restarts to restore performance.

 

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

 

  • State rebalancing: As the state gets cached directly in the executors, the task scheduler prefers to send new micro-batches to where older micro-batches have gone, this causes the new executors that get added to the workload to remain idle. To fix this we use state rebalancing to rearrange tasks to new executors as they get added.

 

spark.conf.set(
  "spark.sql.streaming.statefulOperator.stateRebalancing.enabled",
  "true",
)

 

  • Async checkpoint: The processing of micro-batches in its standard configuration is done sequentially. Meaning that until the state has been committed the processing of a new micro-batch doesn’t start. When using stateful streaming queries state updates can bottleneck the performance. To improve this, the asynchronous checkpoint flag can make the state update asynchronous to the processing of the micro-batch, the tradeoff being slower restarts.

 

spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

 

  • Changelog checkpointing: What we aim with this flag is to make the state of a micro-batch durable by syncing the change log instead of snapshotting the entire state to the checkpoint location. Furthermore, the process of snapshotting is pushed to a background task to avoid blocking task execution in the critical path. Any version of the state can be reconstructed by picking a snapshot and replaying change logs created after that snapshot. This allows for faster and more efficient state checkpointing with the RocksDB state store provider.

 

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
  "true"
)

 

Step 3: Source
For our source we will be using Kafka, the reason for this is that it is the most well-known and supported as well as its native implementation in Spark currently allows for faster reads compared to other streaming sources that don’t rely on the Kafka reader, as well as being multi-cloud. When configuring the read stream we care about checking some configurations.

 

spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls )
.option("kafka.security.protocol", "SSL")
.option("subscribe", read_topic)
.option("minPartitions", "64")#Partitions proportionate to the number of cores
.load()

 

  • minPartitions: This configuration allows us to adjust the number of partitions we want reading the source, by default has a 1:1 mapping with spark partitions. When a minPartitions is higher than the number of topic partitions in Kafka the work of each topic partition will be divided into smaller partitions allowing for improved balancing of the workload.
  • maxOffsetsPerTrigger: By default, Structured Streaming reads all available offsets in a source when triggered, which can lead to a large number of state updates in a single micro-batch. This can cause memory pressure, slow processing times, and even cluster instability in extreme cases. With this configuration, we can keep the performance stable and predictable when a spike of data from the source is received.

For our example, we are creating mock source data and we are keeping it simple by only using the pageId to track traffic. The source JSON will look like the following:

 

{
  "pageId":"5403e8496abf",
  "eventTime":"2023-08-08T07:46:40.000Z"
}

 

Step 4: Transformations
As explained in our environment set-up we aim to create the transformations that would allow us to identify when a page is getting a surge of requests, 5000 clicks in the past 5 seconds. For this we will group by the pageId from the JSON source and count how many we got in the past 5 seconds, just sending to our sink the ones that match our requirements.

JSON Source Read

The code below transforms the raw data from Kafka into a new DataFrame by parsing the JSON into a structured Dataframe.

 

page_clicks = raw_page_clicks.select(
    F.from_json(
        F.col("value").cast("string"),
        sTy.StructType(
            [
                sTy.StructField("pageId", sTy.StringType()),
                sTy.StructField("eventTime", sTy.TimestampType()),
            ]
        ),
    ).alias("payload")
).select("payload.pageId", "payload.eventTime")

 


Aggregation

For our example we count the clicks that are happening in a page for 5 seconds and if they are more than 5000 clicks we send it to the sink.

 

page_clicks = (
    page_clicks
    # Time watermark active for each event
    .withWatermark("eventTime", "5 seconds")
    # We want to send the updated results
    .groupBy("pageId", F.window("eventTime", "5 seconds"))
    .agg(
        F.count("*").alias("click_count"), 
        F.max(F.col("eventTime")).alias("eventTime")
    )
    .filter(F.col("click_count") > 5000)

 

Step 5 Sink:
For our scenario, we are sending the output to a Kafka sink, at this point we also checkpoint the state of the data so that we can keep track of what has been already processed. A note on using Kafka as the sink is that it is an at-least-once sink, so there could be duplicates. 

In the following code, we prep the output for the sink. Packaging the result of our aggregations into the value column.

 

output = (
    page_clicks.withColumn("key", uuidUdf())
    .withColumn(
        "value",
        F.to_json(
          F.struct(
            F.col("pageId"), 
            F.col("click_count"), 
            F.col("eventTime")
            )
          ),
    )
    .select("key", "value")
)

 

In the code below we write to sink and we add the trigger that affects the rate at which the next micro-batch gets executed, as we are interested in 5-second intervals we set it to that amount of time.

 

result = (
    output.writeStream.outputMode("append")
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls)
    .option("kafka.security.protocol", "SSL")
    .option("checkpointLocation", checkpoint_location)
    .option("topic", write_topic)
    .trigger(processingTime="5 seconds")
    .start()
)

 

With this, we have finished building the Stateful Structured Streaming pipeline and when we run it we can evaluate the performance and results for it.

Results
From the image below we can see the time it takes to compute the transformations to identify the pages with the required clicks. This covers from when we read from the source up to when we write to the sink

matthew_m_2-1692647684814.png

Figure 4. Streaming Workload Performance Metrics 

Here is an example of the output sent to the sink where we only send the pages that have gotten more than the required clicks.

matthew_m_1-1692647652737.png

Figure 5. Streaming Sink Output

Conclusion
From this guide, we have described the difference between operational and analytical workloads and have explained how we can build an operational processing workload that prioritises performance in its execution. A consideration for this workload is that we are not persisting the results of the transformation in Delta to reduce latency, this means that if you also want to store the results you should aim to build an analytical workload that executes in parallel to this one. 

Work is being done to further reduce this latency and this is being done through Project Lightspeed, so keep your eyes peeled for new releases that may improve your performance.

Here is the link to the repository with the notebooks used for this blog.

Special Thanks
Thanks to my team for peer-reviewing the content and special mention to Darius Petrov for helping me gather content for the Blog.