cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
srikanth-kilari
New Contributor II

Introduction

Today’s modern vehicles generate large streams of data which can be harnessed in a data-driven manner to boost efficiency, optimize costs, and minimize our environmental impact in real-time. By leveraging this data across a fleet of vehicles at scale, we can achieve even greater benefits, enabling more effective fleet management and optimization.

When it comes to building real-time streaming applications from IoT devices, including vehicle fleet telemetry and other edge devices, Protobuf (Protocol Buffers) has emerged as a popular choice for companies. Its compact, efficient binary format ensures low latency and high throughput, making it ideal for handling critical data, enabling real-time decision-making, and optimizing operations.

Traditional approaches to managing Protobuf events can often introduce a heavy burden on Data Engineering teams, requiring specialized skills that are rare and costly. This increases the ongoing expenses related to maintaining proto packages, system upgrades, and infrastructure. Additionally, data integrity issues during event replays add further complexity, making it challenging for businesses to manage streaming applications efficiently and cost-effectively. This highlights the need for a simple, reliable, and performant solution that minimizes the dependency on technical resources. 

This blog explores how you can harness the capabilities of Serverless Delta Live Tables (DLT) pipelines to transform raw IoT telemetry data from fleets into powerful insights. With native support, Databricks greatly simplifies the process of serializing and de-serializing Protobuf events to enable these insights. Furthermore, Serverless ensures your data teams can focus on high-value activities like extracting insights while Databricks efficiently manages compute resources, including optimizing and scaling compute for workloads. 

 

Table of Contents

 

Use Case: Optimize operations for your entire vehicle fleet 

Company XYZ is a hypothetical logistics provider created to demonstrate real-time vehicle fleet analytics with Databricks Delta Live Tables. They operate thousands of vehicles across various routes daily, facing challenges in optimizing fuel efficiency and monitoring vehicle utilization. Excessive idle times and unmonitored fuel consumption can lead to higher operational costs.

To address these challenges, they leverage real-time data analytics using Databricks Delta Live Tables (DLT) to gain actionable insights from their fleet telemetry data, enabling them to optimize vehicle usage and reduce fuel expenses effectively.

IoT devices in vehicles can transmit telemetry data such as location, speed, fuel level, and engine status. These data points can power insights into fuel efficiency and vehicle utilization, which are essential metrics for optimizing fleet operations. 

The high-level architecture diagram below illustrates the end-to-end implementation of a streaming solution using Kafka and the Databricks Serverless DLT pipeline. 

 

High-level Architecture.jpg

 

A data producer generates fleet telemetry events into a Kafka topic in the Protobuf format. In real-world architectures, this data producer is replaced by the vehicle fleet telemetry IoT devices, which send events to a centralized system such as gRPC Server. This system packages and compacts the individual events into batch messages before transmitting them to Kafka. The Serverless DLT pipeline continuously ingests those events from the Kafka topic, deserializes them using the from_protobuf function, and populates the following tables in the medallion architecture.

  • The bronze_fleet_events_raw table stores the raw events received from the Kafka topic.
  • The bronze_fleet_events table stores the deserialized Protobuf messages.
  • The silver_fleet_events table is an SCD type-1 that handles event replays from edge devices to ensure data integrity.
  • The gold materialized views gold_fuel_efficiency and gold_vehicle_utilization, aggregations built on top of the Silver table, are used for downstream reporting and analytics.

These Gold materialized views are critical for downstream analytics. They provide insights into fuel efficiency and vehicle utilization, which are essential metrics for optimizing fleet operations. 

 

Note!

This blog uses Kafka to efficiently manage real-time fleet telemetry data, ensuring reliable streaming and handling of Protobuf-serialized messages. 

 

Serverless Delta Live Tables

When you build your DLT pipeline, you will be presented with a graphical and visual representation of the flows and dependencies in a Directed Acyclic Graph (DAG). This helps track the flow of data from raw ingestion to the final stages of processing. 

The Protobuf deserialization occurs while populating the bronze_fleet_events, which serves as a source for performing change data capture into the silver table silver_fleet_events. Refer to the detailed sections below for each table to understand the data flow within this pipeline. 

DLT-Pipeline-Graph.png

Leveraging the Serverless option for Delta Live Tables offers several key advantages, making it an ideal choice for efficiently processing Protobuf data without the overhead of managing infrastructure. It provides an easy-to-use and simple experience and builds a performant, auto-optimized, and fully managed data pipeline that will decrease overall TCO. You can enable Serverless DLT by navigating to the pipeline settings and ticking the "Serverless" option. 

DLT-Pipeline-Settings.png

For more detailed insights on the benefits of using serverless DLT, including how it enables cost-effective, incremental ETL pipelines, refer to this latest blog: "Cost-effective, incremental ETL with serverless compute for Delta Live Tables pipelines".

 

The 'Raw' Protobuf Event Data

Protobuf messages are often transmitted in consolidated batches rather than as individual events in high-throughput scenarios. They are prone to data repetition due to replaying past events from edge devices. 

Data is typically structured using a Protobuf schema defined in the .proto file. An example of one is shown below. The schema includes two message types: fleetevent and fleetevents.

fleet_management.proto

syntax = "proto3";
package blog.fleet.tracking;
 
message fleetevent {
  string vehicle_id = 1;
  int64 timestamp = 2;
  double latitude = 3;
  double longitude = 4;
  double speed = 5;
  double fuel_level = 6;
  string engine_status = 7;
}
message fleetevents {
  repeated fleetevent records = 1;
}

 

To optimize data transmission, individual fleet events (fleeteventare consolidated into batches (fleetevents), meaning several events are combined into a single message and then serialized into the Protobuf format before sent to Kafka. 

Here is an example of a raw Protobuf payload retrieved from the Kafka topic. This payload contains a batch of multiple events, each corresponding to individual telemetry readings from a vehicle.

 

"records": [
	{
  	"vehicle_id": "vehicle-27",
  	"timestamp": "1726397972207",
  	"latitude": 38.495878,
  	"longitude": 139.568327,
  	"speed": 6.95,
  	"fuel_level": 59.42,
  	"engine_status": "SERVICE_REQUIRED"
	},
...
...
...

	{
  	"vehicle_id": "vehicle-27",
  	"timestamp": "1726397972212",
  	"latitude": -83.874246,
  	"longitude": 172.466784,
  	"speed": 88.32,
  	"fuel_level": 79.39,
  	"engine_status": "SERVICE_REQUIRED"
	}
  ]
}

 

Ingesting Raw Data into Bronze Layer

The raw Protobuf data is ingested into the Bronze layer in two steps:

  • bronze_fleet_events_raw - the original Protobuf events in the binary format.
  • bronze_fleet_events - the original events deserialized into the structured format.

 

Ingesting Protobuf Events into Databricks Incrementally

The Bronze raw streaming table is where raw fleet telemetry events are ingested from Kafka in Protobuf binary format and ready for further processing. In the DLT pipeline, raw events are ingested into the bronze_fleet_events_raw using the Spark readStream method. This method continuously ingests streaming data from the Kafka topic in real time. 

Streaming tables in DLT are stateful, incrementally computed, and only process data added since the last pipeline run. The readStream query that defines the bronze_fleet_events_raw is provided in the corresponding function.

 

# Bronze raw streaming table
@dlt.table(
 name="bronze_fleet_events_raw",
 comment="bronze (raw) table that ingests the protobuf events from the Kafka topic"
)
def bronze_fleet_events_raw():
 return (
   spark.readStream
     .format("kafka")
     .options(**kafka_read_options)
     .load()
 )

 

Sample result:  bronze_fleet_events_raw contains the raw Protobuf messages ingested from Kafka in the value column.

bronze_fleet_events_raw.png

 

Deserializing Protobuf Binary Data

Deserialization converts the Protobuf binary data into readable structured data that can be processed and queried within the pipeline. The function definition of the Bronze table bronze_fleet_events consists of the Protobuf deserialization logic using the from_protobuf function. 

The value column (containing the payload) and schema_registry_options are passed as parameters to the from_protobuf function, which deserializes the payload into a struct. The explode function is used here to flatten the batch messages (fleetevents) into the individual event (fleetevent) records for further processing. Kafka metadata fields such as key, partition, offset, and timestamp are retained in this table to maintain traceability from the raw Kafka data.

 

# Bronze deserialized streaming table
@dlt.table(
 name="bronze_fleet_events",
 comment="bronze (deserialized) table that handles the deserialization of the protobuf using the from_protobuf function"
)
def bronze_fleet_events():
   return (
       dlt.read_stream("bronze_fleet_events_raw")
         .select("key","partition","offset","timestamp", # metadata fields
           from_protobuf("value", options = schema_registry_options).alias("batch_message")) #payload
         .select("key","partition","offset","timestamp",
           explode("batch_message.*").alias("event"))\
         .selectExpr(
           "event.vehicle_id",
           "event.timestamp as event_timestamp",
           "event.latitude",
           "event.longitude",
           "event.speed",
           "event.fuel_level",
           "event.engine_status",
           "key","partition","offset","timestamp"
       )
   )

 

Sample result: bronze_fleet_events contains the deserialized data

bronze_fleet_events.png

The from_protobuf function by default uses FAILFAST parsing mode, which will immediately halt the pipeline and raise an error if any malformed records are encountered during the deserialization process. Refer to the Databricks documentation for more details. 

 

Native Databricks Protobuf Functions

To use the Protobuf functions from_protobuf and to_protobuf, you must provide either a schema registry via the options argument or a descriptor file identified by the descFilePath argument. 

The from_protobuf and to_protobuf functions are available on Databricks Runtime 12.2 LTS and above. The basic syntax for these functions is as follows:

 

#Casts binary column to a struct
from_protobuf( data: 'ColumnOrName'
, messageName: Optional[str] = None
, descFilePath: Optional[str] = None
, options: Optional[Dict[str, str]] = None
)

#Casts a struct back to a binary
to_protobuf(	  data: 'ColumnOrName'
, messageName: Optional[str] = None
, descFilePath: Optional[str] = None
, options: Optional[Dict[str, str]] = None
)

 

It is considered best practice to use a Schema Registry as it ensures centralized management and versioning of schemas. This provides a robust mechanism for maintaining consistency and compatibility across producers and consumers in the data pipeline. The functions simplify the serialization and deserialization processes by leveraging schema stored in the schema registry, eliminating the need to write the custom serialization and deserialization logic manually. 

Sensitive information, such as credentials for Kafka brokers and the Schema Registry, can be securely managed using Databricks Secrets.

 

Cleansing for Data Integrity in Silver Layer

The silver_fleet_events table handles event replays from edge devices and ensures data integrity by storing the latest event data using an SCD Type-1 approach. SCD Type-1 is used here to store only the latest version of events, ensuring that any replayed or previously sent events from edge devices are correctly updated in the table. The apply_changes function in the DLT pipeline simplifies the change data capture (CDC) process. This function requires specifying the source, target, keys, sequencing for the change feed, and SCD-Type. 

Since edge devices can transmit previously sent events into the Kafka topic again, the Silver table silver_fleet_events is configured as an SCD type-1 table using the apply_changes function. When a previously sent event is ingested again into the DLT pipeline, the corresponding entry in the Silver table gets updated. This table excludes Kafka-specific metadata columns (key, partition, offset, and timestamp) and focuses on business-relevant event data for further processing.

 

# Silver streaming table
dlt.create_target_table(
   name = "silver_fleet_events",
   comment = "Silver table (SCD type-1) that handles event replays from edge devices to ensure data integrity"
)


dlt.apply_changes(
   target = "silver_fleet_events",
   source = "bronze_fleet_events",
   keys = ["vehicle_id","event_timestamp"],
   except_column_list = ["key","partition","offset","timestamp"],
   sequence_by = col("event_timestamp"),
   stored_as_scd_type = 1
)

 

Sample result: silver_fleet_events is an SCD Type-1 table that ensures only the latest fleet telemetry events are stored.

silver_fleet_events.png

 

Aggregating Insights in Gold Layer

Lastly, the Silver table is used to populate two Gold summary materialized views, all within the same DLT pipeline.

  • The gold_fuel_efficiency computes fuel efficiency as the distance traveled per unit of fuel consumed by the vehicle. The function fuel_efficiency_summary contains the view's aggregation logic.
  • The gold_vehicle_utilization calculates the utilization rate of each vehicle by measuring how long the vehicle has been active based on its speed. The function vehicle_utilisation_summary contains the aggregation logic that underpins this view. 
# Gold materialized views
@dlt.table(
 name="gold_fuel_efficiency",
 comment="calculates fuel efficiency (distance per unit of fuel)"
)
def fuel_efficiency_summary():
 return (
   dlt.read("silver_fleet_events")
      .withColumn("prev_fuel_level", lag("fuel_level").over(window_spec))
      .withColumn("fuel_consumed", (col("prev_fuel_level") - col("fuel_level")).cast("double"))
      .withColumn("prev_latitude", lag("latitude").over(window_spec))
      .withColumn("prev_longitude", lag("longitude").over(window_spec))
      .withColumn("distance_traveled", when(col("prev_latitude").isNotNull(),((col("latitude") - col("prev_latitude"))**2 +(col("longitude") - col("prev_longitude"))**2)**0.5).otherwise(0))
      .filter(col("fuel_consumed") > 0)
      .groupBy("vehicle_id").agg(
         round(sum("distance_traveled"),2).alias("total_distance_traveled"),
         round(sum("fuel_consumed"),2).alias("total_fuel_consumed")
      ).withColumn("fuel_efficiency",round(col("total_distance_traveled") / col("total_fuel_consumed"),2))
 )


@dlt.table(
 name="gold_vehicle_utilization",
 comment="calculates the vehicle utilization rate"
 )
def vehicle_utilisation_summary():
 return (
   dlt.read("silver_fleet_events")
      .withColumn("is_active", col("speed") > 0)
      .groupBy("vehicle_id").agg(
       sum(when(col("is_active"), col("event_timestamp")).otherwise(0)).alias("total_active_time"),
       sum(col("event_timestamp")).alias("total_time")
      ).withColumn("utilization_rate", (col("total_active_time") / col("total_time")) * 100)
 )
 

Sample results: 

  • gold_fuel_efficiency: Aggregates total distance traveled vs. fuel consumption.
  • gold_vehicle_utilization: Calculates vehicle utilization rates across time.

gold materialized views.png

 

Conclusion

Protobuf deserialization is a crucial step in processing high-volume, real-time data streams, especially when dealing with consolidated messages from edge devices. By leveraging the native from_protobuf function within a Serverless DLT pipeline, data engineering teams can simplify the deserialization process, reduce infrastructure overhead, improve deserialization performance, and minimize maintenance efforts. 

The Serverless nature of DLT further enhances this approach by eliminating the need for infrastructure management, allowing teams to focus solely on data processing and analytics. This streamlines the ingestion and processing of complex Protobuf messages and ensures that your data pipelines are scalable, cost-efficient, and easy to manage. Using declarative syntax in DLT, you can configure SCD Type 1 and 2 and other ETL needs (such as Data Quality Expectations) to build highly efficient ETL pipelines that support real-time analytics and decision-making. 

As streaming data continues to play a pivotal role across industries, adopting best practices like these in your ETL pipelines will be essential for maintaining a competitive edge and optimizing operational efficiency. Following the steps outlined in this blog, you can confidently implement Protobuf deserialization in your Serverless Databricks DLT pipelines, ensuring that your streaming applications are optimized for performance, scalability, and accuracy.