This article describes an example use case where events from multiple games stream through Kafka and terminate in Delta tables. The example illustrates how to use Delta Live Tables (DLT) to:
A high level view of the system architecture is illustrated below.
First, let's look at the Delta Live Tables code for the example and the related pipeline DAG so that we can get a glimpse of the simplicity and power of the DLT framework.
On the left, we see the DLT Python code. On the right, we see the view and the tables created by the code. The bottom cell of the notebook on the left operates on a list of games (GAMES_ARRAY) to dynamically generate the fourteen target tables we see in the DAG.
Before we go deeper into the example code, let's take a step back and review streaming use cases and some streaming payload format options.
Skip this section if you're familiar with streaming use cases, protobuf, the schema registry, and Delta Live Tables. In this article, we'll dive into a range of exciting topics.
The Databricks Data Intelligence Platform is a comprehensive data-to-AI enterprise solution that combines data engineers, analysts, and data scientists on a single platform. Streaming workloads can power near real-time prescriptive and predictive analytics and automatically retrain Machine Learning (ML) models using Databricks built-in MLOps support. The models can be exposed as scalable, serverless REST endpoints, all within the Databricks platform.
The data comprising these streaming workloads may originate from various use cases:
Streaming Data |
Use Case |
IoT sensors on manufacturing floor equipment |
Generating predictive maintenance alerts and preemptive part ordering |
Set-top box telemetry |
Detecting network instability and dispatching service crews |
Player metrics in a game |
Calculating leader-board metrics and detecting cheat |
Data in these scenarios is typically streamed through open source messaging systems, which manage the data transfer from producers to consumers. Apache Kafka stands out as a popular choice for handling such payloads. Confluent Kafka and AWS MSK provide robust Kafka solutions for those seeking managed services.
Databricks provides capabilities that help optimize the AI journey by unifying Business Analysis, Data Science, and Data Analysis activities in a single, governed platform. In your quest to optimize the end-to-end technology stack, a key focus is the serialization format of the message payload. This element is crucial for efficiency and performance. We'll specifically explore an optimized format developed by Google, known as protocol buffers (or "protobuf"), to understand how it enhances the technology stack.
Google enumerates the advantages of protocol buffers, including compact data storage, fast parsing, availability in many programming languages, and optimized functionality through automatically generated classes.
A key aspect of optimization usually involves using pre-compiled classes in the consumer and producer programs that a developer typically writes. In a nutshell, consumer and producer programs that leverage protobuf are "aware" of a message schema, and the binary payload of a protobuf message benefits from primitive data types and positioning within the binary message, removing the need for field markers or delimiters.
Programs that leverage protobuf must work with classes or modules compiled using protoc (the protobuf compiler). The protoc compiler compiles those definitions into classes in various languages, including Java and Python. To learn more about how protocol buffers work, go here.
Starting in Databricks Runtime 12.1, Databricks provides native support for serialization and deserialization between Apache Spark struct.... Protobuf support is implemented as an Apache Spark DataFrame transformation and can be used with Structured Streaming or for batch operations. It optionally integrates with the Confluent Schema Registry (a Databricks-exclusive feature).
Databricks makes it easy to work with protobuf because it handles the protobuf compilation under the hood for the developer. For instance, the data pipeline developer does not have to worry about installing protoc or using it to compile protocol definitions into Python classes.
Before we proceed, it is worth mentioning that JSON or Avro may be suitable alternatives for streaming payloads. These formats offer benefits that, for some use cases, may outweigh protobuf. Let's quickly review these formats.
JSON
JSON is an excellent format for development because it is primarily human-readable. The other formats we'll explore are binary formats, which require tools to inspect the underlying data values. Unlike Avro and protobuf, however, the JSON document is stored as a large string (potentially compressed), meaning more bytes may be used than a value represents. Consider the short int value of 8. A short int requires two bytes. In JSON, you may have a document that looks like the following, and it will require several bytes (~30) for the associated key, quotes, etc.
{
"my_short": 8
}
When we consider protobuf, we expect 2 bytes plus a few more for the overhead related to the positioning metadata.
JSON support in Databricks
On the positive side, JSON documents have rich benefits when used with Databricks. Databricks Autoloader can easily transform JSON to a structured DataFrame while also providing built-in support for:
Consuming and processing JSON in Databricks is simple. To create a Spark DataFrame from JSON files can be as simple as this:
df = spark.read.format("json").load("example.json")
Avro
Avro is an attractive serialization format because it is compact, encompasses schema information in the files themselves, and has built-in database support in Databricks that includes schema registry integration. This tutorial, co-authored by Databricks' Angela Chu, walks you through an example that leverages Confluent's Kafka and Schema Registry.
To explore an Avro-based dataset, it is as simple as working with JSON:
df = spark.read.format("avro").load("example.avro")
This datageeks.com article compares Avro and protobuf. It is worth a read if you are on the fence between Avro and protobuf. It describes protobuf as the "fastest amongst all.", so if speed outweighs other considerations, such as JSON and Avro's greater simplicity, protobuf may be the best choice for your use case.
The source code for the end-to-end example is located on GitHub. The example includes a simulator (Producer), a notebook to install the Delta Live Tables pipeline (Install_DLT_Pipeline), and a Python notebook to process the data that is streaming through Kafka (DLT).
Imagine a scenario where a video gaming company is streaming events from game consoles and phone-based games for a number of the games in its portfolio. Imagine the game event messages have a single schema that evolves (i.e., new fields are periodically added). Lastly, imagine that analysts want the data for each game to land in its own Delta Lake table. Some analysts and BI tools need pre-aggregated data, too.
Using DLT, our pipeline will create 1+2N tables:
We'll define the Bronze table (bronze_events) as a DLT view by using the @Dlt.viewannotation.
import pyspark.sql.functions as F
from pyspark.sql.protobuf.functions import from_protobuf
@dlt.view
def bronze_events():
return (
spark.readStream.format("kafka")
.options(**kafka_options)
.load()
.withColumn('decoded', from_protobuf(F.col("value"), options = schema_registry_options))
.selectExpr("decoded.*")
)
The repo includes the source code that constructs values for kafka_options. These details are needed so the streaming Delta Live Table can consume messages from the Kafka topic and retrieve the schema from the Confluent Schema registry (via config values in schema_registry_options). This line of code is what manages the deserialization of the protobuf messages:
.withColumn('decoded', from_protobuf(F.col("value"), options = schema_registry_options))
The simplicity of transforming a DataFrame with protobuf payload is thanks to this function: from_protobuf (available in Databricks Runtime 12.1 and later). In this article, we don't cover to_protobuf, but the ease of use is the same. The schema_registry_options are used by the function to look up the schema from the Confluent Schema Registry.
Delta Live Tables is a declarative ETL framework that simplifies the development of data pipelines. So, suppose you are familiar with Apache Spark Structured Streaming. In that case, you may notice the absence of a checkpointLocation (which is required to track the stream's progress so that the stream can be stopped and started without duplicating or dropping data). The absence of the checkpointLocation is because Delta Live Tables manages this need out-of-the-box for you. Delta Live Tables also has other features that help make developers more agile and provide a common framework for ETL across the enterprise. Delta Live Tables Expectations, used for managing data quality, is one such feature.
The following function creates a Silver Streaming Table for the given game name provided as a parameter:
def build_silver(gname):
.table(name=f"silver_{gname}_events")
def gold_unified():
return dlt.read_stream("bronze_events").where(F.col("game_name") == gname)
Notice the use of the @Dlt.table annotation. Thanks to this annotation, when build_silver is invoked for a given gname, a DLT table will be defined that depends on the source bronze_events table. We know that the tables created by this function will be Streaming Tables because of the use of dlt.read_stream.
The following function creates a Gold Materialized View for the given game name provided as a parameter:
def build_gold(gname):
.table(name=f"gold_{gname}_player_agg")
def gold_unified():
return (
dlt.read(f"silver_{gname}_events")
.groupBy(["gamer_id"])
.agg(
F.count("*").alias("session_count"),
F.min(F.col("event_timestamp")).alias("min_timestamp"),
F.max(F.col("event_timestamp")).alias("max_timestamp")
)
)
We know the resulting table will be a "Materialized View" because of the use of dlt.read. This is a simple Materialized View definition; it simply performs a count of source events along with min and max event times, grouped by gamer_id.
The previous two sections of this article defined functions for creating Silver (Streaming) Tables and Gold Materialized Views. The metadata-driven approach in the example code uses a pipeline input parameter to create N*2 target tables (one Silver table for each game and one aggregate Gold table for each game). This code drives the dynamic table creation using the aforementioned build_silver and build_gold functions:
GAMES_ARRAY = spark.conf.get("games").split(",")
for game in GAMES_ARRAY:
build_silver(game)
build_gold(game)
At this point, you might have noticed that much of the control flow code data engineers often have to write is absent. This is because, as mentioned above, DLT is a declarative programming framework. It automatically detects dependencies and manages the pipeline's execution flow. Here's the DAG that DLT creates for the pipeline:
For a continuously running stream, calculating some aggregates can be very resource-intensive. Consider a scenario where you must calculate the "median" for a continuous stream of numbers. Every time a new number arrives in the stream, the median calculation will need to explore the entire set of numbers that have ever arrived. In a stream receiving millions of numbers per second, this fact can present a significant challenge if your goal is to provide a destination table for the median of the entire stream of numbers. It becomes impractical to perform such a feat every time a new number arrives. The limits of computation power and persistent storage and network would mean that the stream would continue to grow a backlog much faster than it could perform the calculations.
In a nutshell, it would not work out well if you had such a stream and tried to recalculate the median for the universe of numbers that have ever arrived in the stream. So, what can you do? If you look at the code snippet above, you may notice that this problem is not addressed in the code! Fortunately, as a Delta Live Tables developer, I do not have to worry about it. The declarative framework handles this dilemma by design. DLT addresses this by materializing results only periodically. Furthermore, DLT provides a table property that allows the developer to set an appropriate trigger interval.
Unity Catalog governs the end-to-end pipeline. Thus, permission to target tables can be granted to end-users and service principals needing access across any Databricks workspaces attached to the same metastore.
From the Delta Live Tables interface, we can navigate to the Catalog and view lineage.
Click on a table in the DAG. Then click on the "Target table" link. |
|
Click on the "Lineage" tab for the table. Then click on the "See lineage graph" link. Lineage also provides visibility into other related artifacts, such as notebooks, models, etc. |
|
This lineage helps accelerate team velocity by making it easier to understand how assets in the workspace are related. |
|
Delta Live Tables will detect this as the source stream's schema evolves, and the pipeline will restart. To simulate a schema evolution for this example, you would run the Producer notebook a subsequent time but with a larger value for num_versions, as shown on the left. This will generate new data where the schema includes some additional columns. The Producer notebook updates the schema details in the Confluent Schema Registry.
When the schema evolves, you will see a pipeline failure like this one:
If the Delta Live Tables pipeline runs in Production mode, a failure will result in an automatic pipeline restart. The Schema Registry will be contacted upon restart to retrieve the latest schema definitions. Once back up, the stream will continue with a new run:
In high-performance IoT systems, optimization extends through every layer of the technology stack, focusing on the payload format of messages in transit. Throughout this article, we've delved into the benefits of using an optimized serialization format, protobuf, and demonstrated its integration with Databricks to construct a comprehensive end-to-end demultiplexing pipeline. This approach underlines the importance of selecting the right tools and formats to maximize efficiency and effectiveness in IoT systems."
To run the example code, follow these instructions:
The following Kafka and Schema Registry connection details (and credentials) should be saved as Databricks Secrets and then set within the Secrets notebook that is part of the repo:
Go here to learn how to save secrets to secure sensitive information (e.g., credentials) within the Databricks Workspace: https://docs.databricks.com/security/secrets/index.html.
The code in the example was written and tested using Confluent's hosted Kafka. To obtain a free trial Confluent account, go here: https://www.confluent.io/confluent-cloud/tryfree/. The code may or may not work seamlessly with other Kafka and Schema Registry providers.
You should know that charges may be incurred for using this example with Confluent's Kafka & Schema Registry. As of the writing of this article, the free instance of Confluent Cloud allowed for ten partitions. Thus, if you wish to stay within that limit, when you run the Producer notebook, choose "10" as the value for the "Number of Target Delta tables" widget.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.