cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Protobuf deserialization in Databricks

sourander
New Contributor III

Hi,

Let's assume I have these things:

  • Binary column containing protobuf-serialized data
  • The .proto file including message definition

What different approaches have Databricks users chosen to deserialize the data? Python is the programming language that I am mostly familiar with, so anything that can be achieved using pyspark would be great.

Locally, my approach would be to use the grpc_tool.protoc to generate the pb2 files (message.proto -> message_pb2.python). I would later on import these classes and use the approariate message to deserialize the binary data. Example code below:

import os
import pkg_resources
from grpc_tools import protoc
 
# Verbose
print(f"[Building] {protopath}")
    
# For some reason, grpc_tools.protoc doesn't include _proto module which causes errors:
# "google.protobuf.Timestamp" is not defined.
path_to_module = pkg_resources.resource_filename('grpc_tools', '_proto')
 
# Flags    
args = (
    'grpc_tools.protoc',  # args 0 is not needed
    f"--proto_path={PROTO_PATH}",
    f"--python_out={OUTPUT_PATH}",
    f"-I{path_to_module}",
    # f"--grpc_python_out={OUTPUT_PATH}", 
    protopath.split("/")[-1]
)
    
protoc.main(args)

My current ideas are:

  • Perform the code above using an external machine. Create a package "my_message_derializer.wheel" and use this as a dependent library on the Job/Task/Cluster. This would need to be updated each time the proto file changes using e.g. git webhooks.
  • Or, in the Databricks, install grpcio and grpcio-tools, and run similar code as above on the driver. Then import the created pb2 class and use the message as usual.

Is there any other way of using the deserializer with Spark? Something a bit less manual?

1 ACCEPTED SOLUTION

Accepted Solutions

sourander
New Contributor III

In case this helps someone else, I will leave the code here for reference that I use for running the method in Databricks inside a UDF.

from typing import Iterator
 
@pandas_udf("string")
def my_test_function(blobs: Iterator[pd.Series]) -> Iterator[pd.Series]:
    d = ProtoFetcher("my_message_name")
    for blob in blobs:
        yield blob.apply(d.blob_to_json)
        
new_df = df.withColumn("col_as_json", my_test_function("original_col"))

For a small dataset, this performed just as the non-Pandas UDF, but I suppose this will change when I scale the dataset up.

View solution in original post

15 REPLIES 15

Kaniz
Community Manager
Community Manager

Hi @ sourander ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers on the community have an answer to your question first. Or else I will follow up shortly with a response.

sourander
New Contributor III

@Kaniz Fatma​, the community hasn't provided answers yet. Similar question has appeared in the Databricks Forums earlier and there are no ideas either. Do you have any ideas how to perform this deserialization efficiently in Databricks? Can we somehow give Spark a protobuf file/message that it would use as a serde for a given column?

Kaniz
Community Manager
Community Manager

Hi @Jani Sourander​ ,

My team shall get back to you shortly with a response.

Thank you for your patience😊 .

Anonymous
Not applicable

@Jani Sourander​ - Following up on Kaniz's answer, we have escalated the issue to the proper team. As she said, they'll get back to you as soon as they can.

jose_gonzalez
Moderator
Moderator

Hi @Jani Sourander​ ,

I found this library sparksql-protobuf it might work. it has not been updated in a while.

Thank you

sourander
New Contributor III

Hi @Jose Gonzalez​ and thank you for the reply! That library seems out of date and lacks documentation. I wonder if ScalaPB would be a better option. I don't have any scala experience, but since scala UDF's perform well with Spark. I suppose learning Scala and taking this approach would be an option. There would be some other benefits too (such as writing other Databricks UDF's in Scala for performance increasement)

It is starting to seem that the best option for MVP is actually the Python-based option that is used as a wheel library in Databricks.

Dan_Z
Honored Contributor
Honored Contributor

I would definitely go with a Python option here, that way you can use PandasUDFs which are faster than scala UDFs. Happy coding!

sourander
New Contributor III

@Dan Zafar​ do you have any ideas on how would I optimize the query that requires an access to a class method? Details below.

I created a class ProtoFetcher that hosts various proto_pb2.py files create using protoc.tools (similarly as in the original post). It can be instantiated by giving a name of the data; internally it imports the correct some_proto_pb2 class, assigns it to class variables and uses the get_attr("some_proto_pb2", "name_of_this_message") to fetch the correct GeneratedProtocolMessageType (which is defined in google.protobuf.pyext.cpp_message).

Sadly, I can't seem to find a way to use the UDF so that the class initialisation wouldn't be inside the function. If I'm not completely wrong, this means that the class will get initialised for each row as a loop. Trying to access the method from outside the udf definition will raise a "PicklingError: Could not serialize object: TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object"

@udf("string")
def my_test_func(blob):
    d = ProtoFetcher("name_of_this_data")
    return d.blob_to_json(blob)
 
new_df = df.withColumn("blob_as_json", my_test_func("original_blob_col"))

Running a display() on that new_df takes about 40 seconds, and this test file has only 600 rows. I also tried the pandas_udf approch and got the similar results. I defined the pandas_udf as:

@pandas_udf("string")
def my_test_function(s: pd.Series) -> pd.Series:
    d = ProtoFetcher("name_of_this_data")
    s_json = s.apply(d.blob_to_json)
    return s_json

 NOTE: The blob column has a fair amount of data, though. Running the same code using local Pandas installation will take 32 seconds on a fairly powerful laptop. This was being run with a code:

df['new_col'] = df['original_col'].apply(d.blob_to_json)

Dan_Z
Honored Contributor
Honored Contributor

Just use an iterator of Series UDF. See here: https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html#iterator-of-series-to-iter...

This allows you to set up some pre-defined state (like loading a file) before doing the computation.

sourander
New Contributor III

In case this helps someone else, I will leave the code here for reference that I use for running the method in Databricks inside a UDF.

from typing import Iterator
 
@pandas_udf("string")
def my_test_function(blobs: Iterator[pd.Series]) -> Iterator[pd.Series]:
    d = ProtoFetcher("my_message_name")
    for blob in blobs:
        yield blob.apply(d.blob_to_json)
        
new_df = df.withColumn("col_as_json", my_test_function("original_col"))

For a small dataset, this performed just as the non-Pandas UDF, but I suppose this will change when I scale the dataset up.

Dan_Z
Honored Contributor
Honored Contributor

Thanks @Jani Sourander​ , you can mark this as 'Best Answer' so that the question is resolved and answer is easy to find for future users.

Anonymous
Not applicable

@Jani Sourander​  would you mind posting the logic that you used in your ProtoFetched class, I am running into the same cannot pickle issue for a protobuf pb2.py file and attempted to re-create the ProtoFetched class and the my_test_function but am still receiving the error.

Thank you

sourander
New Contributor III

Hi,

The ProtoFetcher is just a wrapper for the pb2 file(s) created using the protoc. As of now, I am using Scala to do the same trick (it was about 5x faster).

To avoid the pickling problems, you need to build the ProtoFetcher library into a wheel file on a non-Databricks cluster (e.g. your laptop.) I personally prefer Python Poetry for managing libraries and the build process. Then upload this wheel to S3 and use is as a cluster library in Databricks.

The class itself is simply a wrapper for pb2 files:

from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType
from google.protobuf.json_format import MessageToJson
from ProtoFetcher.PROTO_OUT import example_message_pb2
 
 
# Hard-coded list of implemented messages. A single proto can contain many useful messages.
known_messages = {
    "example_message": {
        "proto_source": "example_message.proto",
        "class": example_message_pb2,
        "proto_messagename": "ExampleMessageList",
        "proto_sub_messagename": "ExampleMessageValues",
    },
}
 
 
class ProtoFetcher:
    """A class to represent a proto message. A wrapper for google.protobuf.
 
    Parameters
    ----------
    messagename : str
        the common name of the service.
 
    Examples
    --------
    >>> service = ProtoFetcher('example_message')
    >>> serde = service.get_deserializer()
    >>> 'some_field_that_is_part_of_the_proto_message' in serde.DESCRIPTOR.fields_by_name.keys()
    True
    """
 
    def __init__(self, messagename: str):
        """Constructs the proto message wrapper."""
 
        # Input check
        if messagename not in known_messages.keys():
            raise NameError(f"The service must be one of: {self.list_message_names()}")
 
        # Settings/Info for this messagename from the config
        self.msg_info = known_messages[messagename]
 
        # Initialize the message
        self.deserializer = self._init_deserializer_class()
 
    def _init_deserializer_class(self) -> GeneratedProtocolMessageType:
        """Gets the protocol message type from the Protobuf descriptors (_pb2.py file).
        It can be used for serializing and deserializing the messages.
        """
        # Get
        message = getattr(self.msg_info["class"], self.msg_info["proto_messagename"])
 
        # Some messages include a submessage. This happens at least when
        # a blob field description is a list of lists.
        if self.msg_info["proto_sub_messagename"]:
            message = getattr(message, self.msg_info["proto_sub_messagename"])
 
        return message
 
    def get_deserializer(self) -> GeneratedProtocolMessageType:
        """Getter for the message that can be used for serializing and deserializing the messages."""
        return self.deserializer
 
    def blob_to_json(self, blob) -> str:
        """Converts the bytes-like BLOB object to a the human-readable form.
 
        Parameters
        ----------
        blob : str/bytes
            bytes-like object or ASCII string including the contents of the blob field.
 
        Returns
        -------
        j : str
            Stringified JSON containing the deserialized message data.
        """
 
        # Deserialize the proto
        deserializer = self.get_deserializer()
        message = deserializer.FromString(blob)
 
        # Convert to JSON and clean it
        my_message_as_json = MessageToJson(message)
        j = json.dumps(json.loads(my_message_as_json))
        return j

Anonymous
Not applicable

 Your article content is extremely fascinating. I'm extremely intrigued with your post. I desire to get more incredible posts.

Direct 2 HR

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.