cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
craig_lukasik
Databricks Employee
Databricks Employee

This article describes an example use case where events from multiple games stream through Kafka and terminate in Delta tables. The example illustrates how to use Delta Live Tables (DLT) to:

  1. Stream from Kafka into a Bronze Delta table.
  2. Consume streaming Protobuf messages with schemas managed by the Confluent Schema Registry, handling schema evolution gracefully.
  3. Demultiplex (demux) messages into multiple game-specific, append-only Silver Streaming Tables. Demux indicates that a single stream is split or fanned out into separate streams.
  4. Create Materialized Views to recalculate aggregate values periodically.

A high level view of the system architecture is illustrated below.

 

Protobuf DLT.png

 First, let's look at the Delta Live Tables code for the example and the related pipeline DAG so that we can get a glimpse of the simplicity and power of the DLT framework.

Code.png DAG.png

On the left, we see the DLT Python code. On the right, we see the view and the tables created by the code. The bottom cell of the notebook on the left operates on a list of games (GAMES_ARRAY) to dynamically generate the fourteen target tables we see in the DAG.

Before we go deeper into the example code, let's take a step back and review streaming use cases and some streaming payload format options.

 

Streaming overview

Skip this section if you're familiar with streaming use cases, protobuf, the schema registry, and Delta Live Tables. In this article, we'll dive into a range of exciting topics.

  • Common streaming use cases
    Uncover the diverse streaming data applications in today's tech landscape.
  • Protocol buffers (Protobuf)
    Learn why this fast and compact serialization format is a game-changer for data handling.
  • Delta Live Tables (DLT)
    Discover how DLT pipelines offer a rich, feature-packed platform for your ETL (Extract, Transform, Load) needs.
  • Building a DLT pipeline
    A step-by-step guide on creating a DLT pipeline that seamlessly consumes Protobuf values from an Apache Kafka stream.
  • Utilizing the Confluent Schema Registry
    Understand how this tool is crucial for decoding binary message payloads effectively.
  • Schema evolution in DLT pipelines
    Navigate the complexities of schema evolution within the DLT pipeline framework when streaming protobuf messages with evolving schema.

Common streaming use cases

The Databricks Data Intelligence Platform is a comprehensive data-to-AI enterprise solution that combines data engineers, analysts, and data scientists on a single platform. Streaming workloads can power near real-time prescriptive and predictive analytics and automatically retrain Machine Learning (ML) models using Databricks built-in MLOps support. The models can be exposed as scalable, serverless REST endpoints, all within the Databricks platform.

The data comprising these streaming workloads may originate from various use cases:

Streaming Data

Use Case

IoT sensors on manufacturing floor equipment

Generating predictive maintenance alerts and preemptive part ordering

Set-top box telemetry

Detecting network instability and dispatching service crews

Player metrics in a game

Calculating leader-board metrics and detecting cheat

Data in these scenarios is typically streamed through open source messaging systems, which manage the data transfer from producers to consumers. Apache Kafka stands out as a popular choice for handling such payloads. Confluent Kafka and AWS MSK provide robust Kafka solutions for those seeking managed services.

Optimizing the streaming payload format

Databricks provides capabilities that help optimize the AI journey by unifying Business Analysis, Data Science, and Data Analysis activities in a single, governed platform. In your quest to optimize the end-to-end technology stack, a key focus is the serialization format of the message payload. This element is crucial for efficiency and performance. We'll specifically explore an optimized format developed by Google, known as protocol buffers (or "protobuf"), to understand how it enhances the technology stack.

What makes protobuf an optimized serialization format?

Google enumerates the advantages of protocol buffers, including compact data storage, fast parsing, availability in many programming languages, and optimized functionality through automatically generated classes.

A key aspect of optimization usually involves using pre-compiled classes in the consumer and producer programs that a developer typically writes. In a nutshell, consumer and producer programs that leverage protobuf are "aware" of a message schema, and the binary payload of a protobuf message benefits from primitive data types and positioning within the binary message, removing the need for field markers or delimiters.

Why is protobuf usually painful to work with?

Programs that leverage protobuf must work with classes or modules compiled using protoc (the protobuf compiler). The protoc compiler compiles those definitions into classes in various languages, including Java and Python. To learn more about how protocol buffers work, go here.

Databricks makes working with protobuf easy

Starting in Databricks Runtime 12.1, Databricks provides native support for serialization and deserialization between Apache Spark struct.... Protobuf support is implemented as an Apache Spark DataFrame transformation and can be used with Structured Streaming or for batch operations. It optionally integrates with the Confluent Schema Registry (a Databricks-exclusive feature). 

Databricks makes it easy to work with protobuf because it handles the protobuf compilation under the hood for the developer. For instance, the data pipeline developer does not have to worry about installing protoc or using it to compile protocol definitions into Python classes.

Exploring payload formats for streaming IoT data

Before we proceed, it is worth mentioning that JSON or Avro may be suitable alternatives for streaming payloads. These formats offer benefits that, for some use cases, may outweigh protobuf. Let's quickly review these formats.

JSON

JSON is an excellent format for development because it is primarily human-readable. The other formats we'll explore are binary formats, which require tools to inspect the underlying data values. Unlike Avro and protobuf, however, the JSON document is stored as a large string (potentially compressed), meaning more bytes may be used than a value represents. Consider the short int value of 8. A short int requires two bytes. In JSON, you may have a document that looks like the following, and it will require several bytes (~30) for the associated key, quotes, etc.

 

{
  "my_short": 8
}

 

When we consider protobuf, we expect 2 bytes plus a few more for the overhead related to the positioning metadata.

JSON support in Databricks
On the positive side, JSON documents have rich benefits when used with Databricks. Databricks Autoloader can easily transform JSON to a structured DataFrame while also providing built-in support for:

  • Schema inference - when reading JSON into a DataFrame, you can supply a schema so that the target DataFrame or Delta table has the desired schema. Or you can let the engine infer the schema. Alternatively, schema hints can be supplied if you want a balance of those features.
  • Schema evolution - Autoloader provides options for how a workload should adapt to changes in the schema of incoming files.

Consuming and processing JSON in Databricks is simple. To create a Spark DataFrame from JSON files can be as simple as this:

 

df = spark.read.format("json").load("example.json")

 

Avro
Avro is an attractive serialization format because it is compact, encompasses schema information in the files themselves, and has built-in database support in Databricks that includes schema registry integration. This tutorial, co-authored by Databricks' Angela Chu, walks you through an example that leverages Confluent's Kafka and Schema Registry.

To explore an Avro-based dataset, it is as simple as working with JSON:

 

df = spark.read.format("avro").load("example.avro")

 

This datageeks.com article compares Avro and protobuf. It is worth a read if you are on the fence between Avro and protobuf. It describes protobuf as the "fastest amongst all.", so if speed outweighs other considerations, such as JSON and Avro's greater simplicity, protobuf may be the best choice for your use case.

 

Example demux pipeline

The source code for the end-to-end example is located on GitHub. The example includes a simulator (Producer), a notebook to install the Delta Live Tables pipeline (Install_DLT_Pipeline), and a Python notebook to process the data that is streaming through Kafka (DLT).

Scenario

Imagine a scenario where a video gaming company is streaming events from game consoles and phone-based games for a number of the games in its portfolio. Imagine the game event messages have a single schema that evolves (i.e., new fields are periodically added). Lastly, imagine that analysts want the data for each game to land in its own Delta Lake table. Some analysts and BI tools need pre-aggregated data, too.

Using DLT, our pipeline will create 1+2N tables: 

  • One table for the raw data (stored in the Bronze view).
  • One Silver Streaming Table for each of the N games, with events streaming through the Bronze table.
  • Each game will also have a Gold Delta table with aggregates based on the associated Silver table.

 

Code walkthrough

Bronze table definition

We'll define the Bronze table (bronze_events) as a DLT view by using the @Dlt.viewannotation.

 

import pyspark.sql.functions as F
from pyspark.sql.protobuf.functions import from_protobuf

@dlt.view
def bronze_events():
  return (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .load()
    .withColumn('decoded', from_protobuf(F.col("value"), options = schema_registry_options))
    .selectExpr("decoded.*")
  )

 

The repo includes the source code that constructs values for kafka_options. These details are needed so the streaming Delta Live Table can consume messages from the Kafka topic and retrieve the schema from the Confluent Schema registry (via config values in schema_registry_options). This line of code is what manages the deserialization of the protobuf messages:

 

.withColumn('decoded', from_protobuf(F.col("value"), options = schema_registry_options))

 

The simplicity of transforming a DataFrame with protobuf payload is thanks to this function: from_protobuf (available in Databricks Runtime 12.1 and later). In this article, we don't cover to_protobuf, but the ease of use is the same. The schema_registry_options are used by the function to look up the schema from the Confluent Schema Registry.

Delta Live Tables is a declarative ETL framework that simplifies the development of data pipelines. So, suppose you are familiar with Apache Spark Structured Streaming. In that case, you may notice the absence of a checkpointLocation (which is required to track the stream's progress so that the stream can be stopped and started without duplicating or dropping data). The absence of the checkpointLocation is because Delta Live Tables manages this need out-of-the-box for you. Delta Live Tables also has other features that help make developers more agile and provide a common framework for ETL across the enterprise. Delta Live Tables Expectations, used for managing data quality, is one such feature.

Silver tables

The following function creates a Silver Streaming Table for the given game name provided as a parameter:

 

def build_silver(gname):
    .table(name=f"silver_{gname}_events")
    def gold_unified():
        return dlt.read_stream("bronze_events").where(F.col("game_name") == gname)

 

Notice the use of the @Dlt.table annotation. Thanks to this annotation, when build_silver is invoked for a given gname, a DLT table will be defined that depends on the source bronze_events table. We know that the tables created by this function will be Streaming Tables because of the use of dlt.read_stream.

Gold tables

The following function creates a Gold Materialized View for the given game name provided as a parameter:

 

def build_gold(gname):
    .table(name=f"gold_{gname}_player_agg")
    def gold_unified():
        return (
            dlt.read(f"silver_{gname}_events")
            .groupBy(["gamer_id"])
            .agg(
                F.count("*").alias("session_count"),
                F.min(F.col("event_timestamp")).alias("min_timestamp"),
                F.max(F.col("event_timestamp")).alias("max_timestamp")
            )
        )

 

We know the resulting table will be a "Materialized View" because of the use of dlt.read. This is a simple Materialized View definition; it simply performs a count of source events along with min and max event times, grouped by gamer_id.

Metadata-driven tables

The previous two sections of this article defined functions for creating Silver (Streaming) Tables and Gold Materialized Views. The metadata-driven approach in the example code uses a pipeline input parameter to create N*2 target tables (one Silver table for each game and one aggregate Gold table for each game). This code drives the dynamic table creation using the aforementioned build_silver and build_gold functions:

 

GAMES_ARRAY = spark.conf.get("games").split(",")

for game in GAMES_ARRAY:
    build_silver(game)
    build_gold(game)

 

At this point, you might have noticed that much of the control flow code data engineers often have to write is absent.  This is because, as mentioned above, DLT is a declarative programming framework.  It automatically detects dependencies and manages the pipeline's execution flow.  Here's the DAG that DLT creates for the pipeline:

craig_lukasik_3-1709831153588.png

A note about aggregates in a streaming pipeline

For a continuously running stream, calculating some aggregates can be very resource-intensive. Consider a scenario where you must calculate the "median" for a continuous stream of numbers. Every time a new number arrives in the stream, the median calculation will need to explore the entire set of numbers that have ever arrived. In a stream receiving millions of numbers per second, this fact can present a significant challenge if your goal is to provide a destination table for the median of the entire stream of numbers. It becomes impractical to perform such a feat every time a new number arrives. The limits of computation power and persistent storage and network would mean that the stream would continue to grow a backlog much faster than it could perform the calculations.

In a nutshell, it would not work out well if you had such a stream and tried to recalculate the median for the universe of numbers that have ever arrived in the stream. So, what can you do? If you look at the code snippet above, you may notice that this problem is not addressed in the code! Fortunately, as a Delta Live Tables developer, I do not have to worry about it. The declarative framework handles this dilemma by design. DLT addresses this by materializing results only periodically. Furthermore, DLT provides a table property that allows the developer to set an appropriate trigger interval.

 

Reviewing the benefits of DLT

Governance

Unity Catalog governs the end-to-end pipeline. Thus, permission to target tables can be granted to end-users and service principals needing access across any Databricks workspaces attached to the same metastore.

Lineage

From the Delta Live Tables interface, we can navigate to the Catalog and view lineage.

Click on a table in the DAG. Then click on the "Target table" link.

craig_lukasik_4-1709831153624.png

 

Click on the "Lineage" tab for the table. Then click on the "See lineage graph" link.


Lineage also provides visibility into other related artifacts, such as notebooks, models, etc.

craig_lukasik_5-1709831153026.png

 

This lineage helps accelerate team velocity by making it easier to understand how assets in the workspace are related.

craig_lukasik_6-1709831152996.png

 

Hands-off schema evolution

craig_lukasik_7-1709831152364.pngDelta Live Tables will detect this as the source stream's schema evolves, and the pipeline will restart. To simulate a schema evolution for this example, you would run the Producer notebook a subsequent time but with a larger value for num_versions, as shown on the left. This will generate new data where the schema includes some additional columns. The Producer notebook updates the schema details in the Confluent Schema Registry. 

When the schema evolves, you will see a pipeline failure like this one:

craig_lukasik_8-1709831153364.png

If the Delta Live Tables pipeline runs in Production mode, a failure will result in an automatic pipeline restart. The Schema Registry will be contacted upon restart to retrieve the latest schema definitions. Once back up, the stream will continue with a new run:

craig_lukasik_9-1709831153186.png

 

Conclusion

In high-performance IoT systems, optimization extends through every layer of the technology stack, focusing on the payload format of messages in transit. Throughout this article, we've delved into the benefits of using an optimized serialization format, protobuf, and demonstrated its integration with Databricks to construct a comprehensive end-to-end demultiplexing pipeline. This approach underlines the importance of selecting the right tools and formats to maximize efficiency and effectiveness in IoT systems."

Instructions for running the example

To run the example code, follow these instructions:

  1. In Databricks, clone this repo: https://github.com/craig-db/protobuf-dlt-schema-evolution.
  2. Set up the prerequisites (documented below).
  3. Follow the instructions in the README notebook included in the repo code.

Prerequisites

  1. A Unity Catalog-enabled workspace – this demo uses a Unity Catalog-enabled Delta Live Tables pipeline. Thus, Unity Catalog should be configured for the workspace where you plan to run the demo. 
  2. As of January 2024, you should use the Preview channel for the Delta Live Tables pipeline. The "Install_DLT_Pipeline" notebook will use the Preview channel when installing the pipeline.
  3. Confluent account – this demo uses Confluent Schema Registry and Confluent Kafka.

Secrets to configure

The following Kafka and Schema Registry connection details (and credentials) should be saved as Databricks Secrets and then set within the Secrets notebook that is part of the repo:

  • SR_URL: Schema Registry URL (e.g. https://myschemaregistry.aws.confluent.cloud)
  • SR_API_KEY: Schema Registry API Key
  • SR_API_SECRET: Schema Registry API Secret
  • KAFKA_KEY: Kafka API Key
  • KAFKA_SECRET: Kafka Secret
  • KAFKA_SERVER: Kafka host:port (e.g. mykafka.aws.confluent.cloud:9092)
  • KAFKA_TOPIC: The Kafka Topic
  • TARGET_SCHEMA: The target database where the streaming data will be appended into a Delta table (the destination table is named unified_gold)
  • CHECKPOINT_LOCATION: Some location (e.g., in DBFS) where the checkpoint data for the streams will be stored

Go here to learn how to save secrets to secure sensitive information (e.g., credentials) within the Databricks Workspace: https://docs.databricks.com/security/secrets/index.html.

Kafka and Schema Registry

The code in the example was written and tested using Confluent's hosted Kafka. To obtain a free trial Confluent account, go here: https://www.confluent.io/confluent-cloud/tryfree/. The code may or may not work seamlessly with other Kafka and Schema Registry providers.

An Important Note about Costs on Confluent Cloud

You should know that charges may be incurred for using this example with Confluent's Kafka & Schema Registry. As of the writing of this article, the free instance of Confluent Cloud allowed for ten partitions. Thus, if you wish to stay within that limit, when you run the Producer notebook, choose "10" as the value for the "Number of Target Delta tables" widget.

1 Comment
Erik
Valued Contributor II

How many streams towards Kafka does this make? 1, or as many as you have gold-tables? When I tried to do the same in pure structured streaming, I end up with one stream per gold-table, even though in the notebook they share bronze-layer. This kind of makes sense, since its only the gold-tables which use a streaming-checkpoint, so thats where their individual state is stored.

My guess is that since your bronze is a view, then you will have the same situation, but maybe DLT can do some magic?