Learn to build fast, stateful pipelines for operational workloads. Discover stateless vs. stateful streams, how to setup your cluster and more. Get hands-on building a pipeline with code snippets and access to the repo. Start your real-time journey now!
Introduction
Apache Spark Structured Streaming is a scalable, fault-tolerant stream processing engine used in the Databricks Lakehouse Platform. There are two types of Structured Streaming streams: stateless and stateful. Stateful streaming maintains the context of previously processed data across batches or time intervals, ideal for handling streaming data with temporal dependencies, while stateless processes data records independently.
When building Structured Streaming pipelines, you have to take into account whether your workload is analytical or operational as they have different architectures.
Analytical processing follows the Medallion architecture, where data is written to a persistent storage format like Delta. This architecture emphasizes the persistence of data, making it a reliable source of truth for analysis. Analytical processing focuses on performing in-depth analysis and computations on top of the data, enabling organizations to gain insights and make informed decisions based on the processed information.
Figure 1. Analytical Workloads (source)
On the other hand, operational processing is used to run critical parts of a business in near-real-time. Unlike analytical processing, operational processing emphasizes timely transformations and actions on the data. This architecture enables organizations to quickly process incoming data, make operational decisions, and trigger immediate actions based on the real-time insights derived from the data.
Figure 2. Operational Workloads (source)
In this guide, we will focus on how to build a time-critical operational Structured Streaming pipeline and the key design considerations that go into it.
Preparing the Environment
As an employee of an online retailer, your objective is to boost sales conversion on the site. To accomplish this, leadership has assigned you the responsibility of creating a near-real-time pipeline. This pipeline will take in page traffic data as input and produce information about the sites experiencing increased traffic as output. The ultimate goal is to enable real-time adjustments to various components of these pages, to drive up sales.
In pursuit of this objective, a comprehensive analysis of the site's traffic has been conducted. Following the analysis, the team has reached a consensus that the target will be sites that have accumulated over 5000 clicks within the past 5 seconds. The intention is to focus on these high-traffic sites and implement real-time strategies to increase their sales conversion rate.
The architecture devised incorporates both an analytical and operational processing pipeline, allowing us to accomplish the business goal of achieving near-real-time data processing while still preserving the source of truth for assessing historical trends. Below we can see what this architecture looks like.
Figure 3. Operational and Analytical Processing Architecture
For this guide, we will focus on the operational part of the architecture to demonstrate how we can achieve this by leveraging Spark Structured Streaming to achieve low latency results. To achieve this we will follow the steps below.
Setup
Step 1: Cluster Creation
We will begin by creating the cluster that will be used for our streaming pipeline. Streaming jobs are generally a compute/CPU-bound activity, this means that we should aim to choose a cluster config with a higher CPU count. When the state is expected to be very large we will also have to take into account memory in our clusters to make sure that the cluster can be held within the cluster, to avoid spills and with it, latency. Sizing is an exercise that depends on the source data volumes. The goal when choosing the cluster is to increase the parallelism with respect to the source. These are other considerations that need to be taken into account.
Step 2: Spark Config
To extract the best performance from Structured Streaming here are some Spark configurations for low latency performance.
spark.conf.set("spark.sql.shuffle.partitions", "64") #Proportionate to number of cores
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)
spark.conf.set(
"spark.sql.streaming.statefulOperator.stateRebalancing.enabled",
"true",
)
spark.conf.set(
"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
"true"
)
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
"true"
)
Step 3: Source
For our source we will be using Kafka, the reason for this is that it is the most well-known and supported as well as its native implementation in Spark currently allows for faster reads compared to other streaming sources that don’t rely on the Kafka reader, as well as being multi-cloud. When configuring the read stream we care about checking some configurations.
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls )
.option("kafka.security.protocol", "SSL")
.option("subscribe", read_topic)
.option("minPartitions", "64")#Partitions proportionate to the number of cores
.load()
For our example, we are creating mock source data and we are keeping it simple by only using the pageId to track traffic. The source JSON will look like the following:
{
"pageId":"5403e8496abf",
"eventTime":"2023-08-08T07:46:40.000Z"
}
Step 4: Transformations
As explained in our environment set-up we aim to create the transformations that would allow us to identify when a page is getting a surge of requests, 5000 clicks in the past 5 seconds. For this we will group by the pageId from the JSON source and count how many we got in the past 5 seconds, just sending to our sink the ones that match our requirements.
JSON Source Read
The code below transforms the raw data from Kafka into a new DataFrame by parsing the JSON into a structured Dataframe.
page_clicks = raw_page_clicks.select(
F.from_json(
F.col("value").cast("string"),
sTy.StructType(
[
sTy.StructField("pageId", sTy.StringType()),
sTy.StructField("eventTime", sTy.TimestampType()),
]
),
).alias("payload")
).select("payload.pageId", "payload.eventTime")
Aggregation
For our example we count the clicks that are happening in a page for 5 seconds and if they are more than 5000 clicks we send it to the sink.
page_clicks = (
page_clicks
# Time watermark active for each event
.withWatermark("eventTime", "5 seconds")
# We want to send the updated results
.groupBy("pageId", F.window("eventTime", "5 seconds"))
.agg(
F.count("*").alias("click_count"),
F.max(F.col("eventTime")).alias("eventTime")
)
.filter(F.col("click_count") > 5000)
Step 5 Sink:
For our scenario, we are sending the output to a Kafka sink, at this point we also checkpoint the state of the data so that we can keep track of what has been already processed. A note on using Kafka as the sink is that it is an at-least-once sink, so there could be duplicates.
In the following code, we prep the output for the sink. Packaging the result of our aggregations into the value column.
output = (
page_clicks.withColumn("key", uuidUdf())
.withColumn(
"value",
F.to_json(
F.struct(
F.col("pageId"),
F.col("click_count"),
F.col("eventTime")
)
),
)
.select("key", "value")
)
In the code below we write to sink and we add the trigger that affects the rate at which the next micro-batch gets executed, as we are interested in 5-second intervals we set it to that amount of time.
result = (
output.writeStream.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls)
.option("kafka.security.protocol", "SSL")
.option("checkpointLocation", checkpoint_location)
.option("topic", write_topic)
.trigger(processingTime="5 seconds")
.start()
)
With this, we have finished building the Stateful Structured Streaming pipeline and when we run it we can evaluate the performance and results for it.
Results
From the image below we can see the time it takes to compute the transformations to identify the pages with the required clicks. This covers from when we read from the source up to when we write to the sink
Figure 4. Streaming Workload Performance Metrics
Here is an example of the output sent to the sink where we only send the pages that have gotten more than the required clicks.
Figure 5. Streaming Sink Output
Conclusion
From this guide, we have described the difference between operational and analytical workloads and have explained how we can build an operational processing workload that prioritises performance in its execution. A consideration for this workload is that we are not persisting the results of the transformation in Delta to reduce latency, this means that if you also want to store the results you should aim to build an analytical workload that executes in parallel to this one.
Work is being done to further reduce this latency and this is being done through Project Lightspeed, so keep your eyes peeled for new releases that may improve your performance.
Here is the link to the repository with the notebooks used for this blog.
Special Thanks
Thanks to my team for peer-reviewing the content and special mention to Darius Petrov for helping me gather content for the Blog.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.