Internet of Things (IoT) technology has increased real-time transmission, processing, and analysis of sensor data. In the automotive industry, this manifests as vehicles equipped with sensors that monitor various components, enabling some example use cases below.
Fleet management
Connected Vehicle Partnerships with OEMs
Data transmitted by connected vehicles is not well-standardised. Existing standards (e.g. AEMP 2.0) lack universal adoption. Original Equipment Manufacturers (OEMs) often provide proprietary telematics solutions with unique data structures and formats, which creates a challenge for organisations looking to aggregate data from several providers. Normalisation involves processing data from diverse sources into a unified data model.
1. Ingestion methods
Ingestion patterns for IoT data are typically determined by the architecture for network communication used for data exchange. One pattern is to stream events from sensors via Apache Kafka. This is typically useful for event-driven scenarios for urgent messages such as delivery of diagnostic trouble codes, emergency assistance requests or collision notifications.
Another pattern uses a pull based approach that polls an API provided by aggregators or OEMs. This allows vehicle telemetry data such as speed, location, and fuel level to be queried at regular intervals or continuously. Apache Kafka is commonly used to ingest telemetry data produced by APIs in a real time stream as it is an event streaming platform with a high throughput and can manage message queues with a publisher / subscriber model.
Other use cases involve working with historical aggregated data. For these scenarios, data can be ingested from files in cloud storage using Auto Loader.
2. Data Serialization
IoT data is often serialized into data structures that are easily transmitted. Deserializing messages with various serialization formats is often the first step in processing IoT data into standardized objects. This blog details the considerations required depending on the use case.
The choice of formats used by OEMs typically depend on a few factors around readability with encoding, schema flexibility, speed of serialisation, and network bandwidth constraints.
Protobuf | Avro | JSON | |
Encoding | Binary | Binary | Text |
Schema | Strict; Requires code generation | Strict; Embedded in-message | Flexible |
Serialization | Fast | Medium | Slowest |
Packet Size | Small | Medium | Large |
3. Data model & schemas
Whilst OEMs will often track similar data points, the IoT device, installation, and configuration will impact the level of detail, granularity and availability of data points. The chosen data format will also determine how schemas are defined and read within a pipeline. This results in different data models and schemas across OEMs that need to be normalised into a consistent data model.
4. Measuring Units
Sensors on vehicle systems will usually track common metrics such as speed, pressure and distance based on a measuring unit that it is calibrated on. However, the measurement system used could differ based on the OEMs, which is largely influenced by geography and hardware configuration. For example, distance as miles under the Imperial system or metres under the metric system.
5. Business definitions
OEMs can have different definitions and interpretations of a field. For example, speed measured via the vehicle’s wheels can both refer to rpm (revolutions per minute) or km/h. There needs to be clear definitions that ensure that the right field measurements are mapped between the OEMs.
6. Data frequency
Devices on a vehicle generate data at different frequencies, depending on the type of data or event recorded. For example, GPS could generate a data point every second for near real time tracking, but fuel level sensors only need to generate readings every minute. This also needs to be normalised for consistency downstream.
7. Message batching
OEMs will also have different approaches to batching messages sent through by the IoT device to conserve network bandwidth and transmission costs. This results in nested structures in a payload that records data at a more granular frequency than when it is transmitted (ie: 60 data points for a device that generates data per second but messages are only sent per minute).
8. Multiple devices for the same metric
There could be many devices in a vehicle that records the same measurement at different parts of the vehicle. For example: door sensors, pressure sensors in tyres, light sensors. This typically results in one-to-many relationships in the fields generated that results in a struct, which needs to be normalised in reference to the position of the sensor used to record the data (ie: front left door).
The challenges above can be segregated into different layers / groups below. This blog focuses on approaches mainly around the ingestion layer, with a focus on source connectors, data deserialisation and schema handling.
1. Ingestion from various sources
Connected vehicles provide data through a variety of integrations. Data may be forwarded to an event queue, shared via files, or polled from APIs. It can be difficult to manage the number and variety of integrations needed to ingest telematics for an entire fleet.
Databricks provides a variety of data source formats to satisfy different ingestion patterns:
Spark’s DataStreamReader is the entry-point for streaming ingestion. A variety of options (e.g. the source format, file path, queue URL) can be provided, as shown in the example below.
# Set up some options for Auto Loader:
options = {
"format": "cloudFiles",
"path": "/Volumes/...",
"checkpointLocation": "/Volumes/...",
"cloudFiles.format": "json",
"cloudFiles.schemaLocation": "/Volumes/...",
"cloudFiles.schemaEvolutionMode": "addNewColumns"
}
# Create the data stream reader:
data_stream_reader = (
spark
.readStream
.options(**options)
)
# Get a streaming DataFrame:
df = data_stream_reader.load()
2. Data modelling / flattening
To address the various data formats & schemas, the data can be deserialised using functions and read into a target schema via a centralised repository.
i) Data formats
Whilst the functions above allow entire structures to be materialised, it requires a complete schema definition, which can be challenging with deeply nested structures. It could be more efficient and performant to only extract fields required.
ii) Centralised schema repository
Schemas for streaming data can either be defined in a schema registry or manually. In Databricks, functions such as from_avro or to_avro can use an integrated Schema Registry, which is either via a cluster with a Schema Registry service or with a connection to Confluent. The schema registry can also track schema changes, with schema embedded on each record via an identifier to future proof the stream. External schema management tools such as Flyway and Liquibase can also be integrated with Databricks.
Schemas for the data models can be defined manually as part of a config (as shown below in the code example as a dictionary). For streams with packets of different schemas, different configs can be stored in a table. This decouples it from processing logic in code. Easy modification in data models as all changes can be made in one place with version control, reducing deployment risks whilst enabling rapid schema evolution.
Both approaches would allow for data from various formats and schemas to be read into their respective dataframes with the defined schema. The column naming standards can be stored as config and applied through mapping for consistency either during table creation or with the withColumnRenamed() function.
When OEMs do not have the same data availability or granularity, and the field is required and needs to be retained within the data model where available, Spark supports the unionByName() function that allows the set of column names between the dataframes to differ via the parameter allowMissingColumns, where missing columns will be filled with nulls.
Below are a couple of examples to illustrate the steps taken to read in JSON data from a defined schema into a dataframe in a standardised structure.
a. A single stream with the same schema
In this first example, a sample payload below can be extracted from an API from an OEM, where all event types are contained within a stream using a single schema. Whilst this approach simplifies ingestion, it introduces field redundancy as all events are sent at the same frequency. The schema generated are also typically more complex and deeply nested.
# Example Payload
record = {
"vin": "1HMS96P6WTX72MDBN",
"oem": "OEM1",
"timestamp": "2025-02-25T19:58:21.636Z",
"coordinates": {"latitude":51.465567,"longitude":-0.288331},
"diagnostics_code": [{"name": "tire_warning_system_error", "state": "off"},
{"name": "battery_low_warning", "state": "off"},
{"name": "brake_fluid_warning", "state": "off"}
],
"diagnostics":
{
"fuel_level": {"data": 0.824427480916031},
"odometer": {"data": {"unit": "kilometers", "value": 2145.83}},
"tire_pressures": [
{"data": {"location": "spare", "pressure": {"unit": "bars", "value": 2.25}}, "failure": None},
{"data": {"location": "rear_left_outer", "pressure": {"unit": "bars", "value": 2.25}}, "failure": None},
{"data": {"location": "rear_right_outer", "pressure": {"unit": "bars", "value": 2.25}}, "failure": None},
{"data": {"location": "rear_left", "pressure": {"unit": "bars", "value": 2.24}}, "failure": None},
{"data": {"location": "rear_right", "pressure": {"unit": "bars", "value": 2.24}}, "failure": None},
{"data": {"location": "front_right", "pressure": {"unit": "bars", "value": 2.31}}, "failure": None},
{"data": {"location": "front_left", "pressure": {"unit": "bars", "value": 2.2}}, "failure": None}
]
}
}
Each payload can be stored as a row in a dataframe when streamed in. The JSON record can be converted to Variant type, which is a data type that for semi-structured data
# Convert to Variant type
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
record_json = json.dumps(record)
row = Row(record_json=record_json)
df = spark.createDataFrame([row], schema='record_json STRING')\
.withColumn("record_variant", F.parse_json(F.col("record_json")).cast("variant"))\
.drop("record_json")
To parse the fields from the JSON structure, a schema is defined as key value pairs to only extract selected fields that are required in the common data model. The schema can be stored either in a table or as a config file that can be versioned controlled as part of a deployment pipeline. Using a schema defined would ensure consistent column naming standards.
# Schema defined as a dict
schema = {
"oem":"record_variant:oem",
"timestamp":"record_variant:timestamp",
"vin":"record_variant:vin",
"latitude":"record_variant:coordinates:latitude",
"longitude":"record_variant:coordinates:longitude",
"fuel_level":"record_variant:diagnostics:fuel_level.data",
"odometer":"record_variant:diagnostics:odometer:data.value",
"tire_pressures":"record_variant:diagnostics:tire_pressures"
}
Finally, the selected fields from JSON can be read into a dataframe based on the schema.
# Generating new dataframe based on field mapping
def create_table(df, schema):
col_list = [f'{value} as {key}' for key, value in schema.items()]
df_schema = df.selectExpr(*col_list)
return df_schema
This generates the resulting table.
b. Using a multiplex stream with JSON with two different schemas
In this second example, we focus on production systems that often require processing data from multiple event types, vehicle types, or manufacturers. Varied data may arrive via the same event broker in a multiplexed stream. We may need to process data from several OEMs arriving via the same Kafka topic, for example. Each OEM may deliver data with a different schema. Using sample data with example
CREATE TABLE IF NOT EXISTS schema_tbl (
oem_id LONG NOT NULL GENERATED ALWAYS AS IDENTITY,
oem_name STRING NOT NULL,
schema_json STRING NOT NULL,
event_schema STRING GENERATED ALWAYS AS (schema_of_json(schema_json)), -- Generates a Spark-friendly schema string from a JSON example
CONSTRAINT pk_schema_tbl PRIMARY KEY (oem_id)
);
INSERT INTO dbx.demo.schema_tbl (oem_name, schema_json)
VALUES
( -- OEM_001 schema
'oem_001',
'{"oem_id": "oem_001", "deviceid": "sensor-001", "unixtime": 1000000000000, "lat": 0.0, "lon": 0.0, "heading": 0}'
),
( -- OEM_002 schema
'oem_002',
'{"oem_id": "oem_002", "deviceid": "2310170-0054", "event_info": {"event_ts": "2024-05-01T00:02:56.000Z", "unixtime": 1714521776, "lat": 36.109165, "lon": -79.55684}, "vehicle_info": {"fuel_lvl": 72.4, "in_motion": true, "engine_running": false}}'
);
When processing vehicle data with structured streaming, we can use a foreachBatch sink to apply custom logic to each incremental batch of data. Our foreachBatch function can:
1. Loop through the values of oem_id present in the arriving data
2. Get the message schema from the schema table
# Extract the packet IDs from the stream using get_json_object() from multiplex_df, which contains data from both packets
stream_df = multiplex_df.withColumn('packetid', get_json_object(col('value').cast('string'), '$.packetid'))
# List the event types and loop over each to apply the corresponding schema to deserialise JSON
def write_events_to_delta(batch_df, batch_id):
# List the different event types:
event_types = (
spark.read
.table('dbx.demo.schema_tbl')
.selectExpr('oem_id', 'event_name', 'event_schema')
.collect()
)
for event_type in event_types:
# Get the packet ID, event type, and schema:
oem_id = event_type[0]
event_name = event_type[1]
schema = event_type[2]
tbl_name = f'dbx.demo.{event_name}_events'
write_to_silver = (
batch_df.where(f'packetid = "{packet_id}"') # Filter by packet id that corresponds to the schema
.withColumn('message', col('value').cast('string'))
.withColumn('json', from_json('message', schema)) # Deserialize the JSON into a Struct column using from_json
.select(f'json.*')
.write
.format('delta')
.mode('append')
.saveAsTable(tbl_name) # Write results to the correct Delta table
)
# Use foreachBatch to apply this
silver_write = (
stream_df
.writeStream
.queryName('streaming_write_events_to_delta')
.option('checkpointLocation', '/Volumes/....')
.foreachBatch(write_events_to_delta)
.start()
)
Databricks provides a comprehensive solution in addressing the challenges involved in telematics data across a variety of ingestion patterns, data formats & schemas through various built-in connectors and functions native to the platform. Ingestion connections for custom data sources can also be created using the Python Data Source API.
By implementing a centralised schema management, using Databrick’s transformation capabilities for semi-structured data, and adopting a standardised approach to metrics conversion, OEMs can overcome challenges in data normalisation to deliver value through real time connected vehicles.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.