cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Protobuf deserialization in Databricks

sourander
New Contributor III

Hi,

Let's assume I have these things:

  • Binary column containing protobuf-serialized data
  • The .proto file including message definition

What different approaches have Databricks users chosen to deserialize the data? Python is the programming language that I am mostly familiar with, so anything that can be achieved using pyspark would be great.

Locally, my approach would be to use the grpc_tool.protoc to generate the pb2 files (message.proto -> message_pb2.python). I would later on import these classes and use the approariate message to deserialize the binary data. Example code below:

import os
import pkg_resources
from grpc_tools import protoc
 
# Verbose
print(f"[Building] {protopath}")
    
# For some reason, grpc_tools.protoc doesn't include _proto module which causes errors:
# "google.protobuf.Timestamp" is not defined.
path_to_module = pkg_resources.resource_filename('grpc_tools', '_proto')
 
# Flags    
args = (
    'grpc_tools.protoc',  # args 0 is not needed
    f"--proto_path={PROTO_PATH}",
    f"--python_out={OUTPUT_PATH}",
    f"-I{path_to_module}",
    # f"--grpc_python_out={OUTPUT_PATH}", 
    protopath.split("/")[-1]
)
    
protoc.main(args)

My current ideas are:

  • Perform the code above using an external machine. Create a package "my_message_derializer.wheel" and use this as a dependent library on the Job/Task/Cluster. This would need to be updated each time the proto file changes using e.g. git webhooks.
  • Or, in the Databricks, install grpcio and grpcio-tools, and run similar code as above on the driver. Then import the created pb2 class and use the message as usual.

Is there any other way of using the deserializer with Spark? Something a bit less manual?

15 REPLIES 15

Amou
New Contributor II
New Contributor II

We've now added a native connector with parsing directly with Spark Dataframes. 
https://docs.databricks.com/en/structured-streaming/protocol-buffers.html

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

  

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.