โ09-15-2021 03:36 AM
Hi,
โ
Let's assume I have these things:
โ
What different approaches have Databricks users chosen to deserialize the data? Python is the programming language that I am mostly familiar with, so anything that can be achieved using pyspark would be great.
โ
Locally, my approach would be to use the grpc_tool.protoc to generate the pb2 files (message.proto -> message_pb2.python). I would later on import these classes and use the approariate message to deserialize the binary data. Example code below:
โ
import os
import pkg_resources
from grpc_tools import protoc
 
# Verbose
print(f"[Building] {protopath}")
    
# For some reason, grpc_tools.protoc doesn't include _proto module which causes errors:
# "google.protobuf.Timestamp" is not defined.
path_to_module = pkg_resources.resource_filename('grpc_tools', '_proto')
 
# Flags    
args = (
    'grpc_tools.protoc',  # args 0 is not needed
    f"--proto_path={PROTO_PATH}",
    f"--python_out={OUTPUT_PATH}",
    f"-I{path_to_module}",
    # f"--grpc_python_out={OUTPUT_PATH}", 
    protopath.split("/")[-1]
)
    
protoc.main(args)My current ideas are:
โ
Is there any other way of using the deserializer with Spark? Something a bit less manual?
โ
โ10-28-2021 10:56 PM
In case this helps someone else, I will leave the code here for reference that I use for running the method in Databricks inside a UDF.
from typing import Iterator
 
@pandas_udf("string")
def my_test_function(blobs: Iterator[pd.Series]) -> Iterator[pd.Series]:
    d = ProtoFetcher("my_message_name")
    for blob in blobs:
        yield blob.apply(d.blob_to_json)
        
new_df = df.withColumn("col_as_json", my_test_function("original_col"))For a small dataset, this performed just as the non-Pandas UDF, but I suppose this will change when I scale the dataset up.
โ09-29-2021 11:49 PM
@Kaniz Fatmaโ, the community hasn't provided answers yet. Similar question has appeared in the Databricks Forums earlier and there are no ideas either. Do you have any ideas how to perform this deserialization efficiently in Databricks? Can we somehow give Spark a protobuf file/message that it would use as a serde for a given column?
โ09-30-2021 10:29 AM
@Jani Souranderโ - Following up on Kaniz's answer, we have escalated the issue to the proper team. As she said, they'll get back to you as soon as they can.
โ10-11-2021 04:41 PM
Hi @Jani Souranderโ ,
I found this library sparksql-protobuf it might work. it has not been updated in a while.
Thank you
โ10-11-2021 11:11 PM
Hi @Jose Gonzalezโ and thank you for the reply! That library seems out of date and lacks documentation. I wonder if ScalaPB would be a better option. I don't have any scala experience, but since scala UDF's perform well with Spark. I suppose learning Scala and taking this approach would be an option. There would be some other benefits too (such as writing other Databricks UDF's in Scala for performance increasement)
It is starting to seem that the best option for MVP is actually the Python-based option that is used as a wheel library in Databricks.
โ10-20-2021 02:53 PM
I would definitely go with a Python option here, that way you can use PandasUDFs which are faster than scala UDFs. Happy coding!
โ10-28-2021 06:18 AM
@Dan Zafarโ do you have any ideas on how would I optimize the query that requires an access to a class method? Details below.
I created a class ProtoFetcher that hosts various proto_pb2.py files create using protoc.tools (similarly as in the original post). It can be instantiated by giving a name of the data; internally it imports the correct some_proto_pb2 class, assigns it to class variables and uses the get_attr("some_proto_pb2", "name_of_this_message") to fetch the correct GeneratedProtocolMessageType (which is defined in google.protobuf.pyext.cpp_message).
Sadly, I can't seem to find a way to use the UDF so that the class initialisation wouldn't be inside the function. If I'm not completely wrong, this means that the class will get initialised for each row as a loop. Trying to access the method from outside the udf definition will raise a "PicklingError: Could not serialize object: TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object"
@udf("string")
def my_test_func(blob):
    d = ProtoFetcher("name_of_this_data")
    return d.blob_to_json(blob)
 
new_df = df.withColumn("blob_as_json", my_test_func("original_blob_col"))Running a display() on that new_df takes about 40 seconds, and this test file has only 600 rows. I also tried the pandas_udf approch and got the similar results. I defined the pandas_udf as:
@pandas_udf("string")
def my_test_function(s: pd.Series) -> pd.Series:
    d = ProtoFetcher("name_of_this_data")
    s_json = s.apply(d.blob_to_json)
    return s_jsonNOTE: The blob column has a fair amount of data, though. Running the same code using local Pandas installation will take 32 seconds on a fairly powerful laptop. This was being run with a code:
df['new_col'] = df['original_col'].apply(d.blob_to_json)โ10-28-2021 08:32 AM
Just use an iterator of Series UDF. See here: https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html#iterator-of-series-to-iter...
This allows you to set up some pre-defined state (like loading a file) before doing the computation.
โ10-28-2021 10:56 PM
In case this helps someone else, I will leave the code here for reference that I use for running the method in Databricks inside a UDF.
from typing import Iterator
 
@pandas_udf("string")
def my_test_function(blobs: Iterator[pd.Series]) -> Iterator[pd.Series]:
    d = ProtoFetcher("my_message_name")
    for blob in blobs:
        yield blob.apply(d.blob_to_json)
        
new_df = df.withColumn("col_as_json", my_test_function("original_col"))For a small dataset, this performed just as the non-Pandas UDF, but I suppose this will change when I scale the dataset up.
โ10-29-2021 02:41 PM
Thanks @Jani Souranderโ , you can mark this as 'Best Answer' so that the question is resolved and answer is easy to find for future users.
โ04-01-2022 08:51 AM
@Jani Souranderโ would you mind posting the logic that you used in your ProtoFetched class, I am running into the same cannot pickle issue for a protobuf pb2.py file and attempted to re-create the ProtoFetched class and the my_test_function but am still receiving the error.
Thank you
โ04-03-2022 10:41 PM
Hi,
The ProtoFetcher is just a wrapper for the pb2 file(s) created using the protoc. As of now, I am using Scala to do the same trick (it was about 5x faster).
To avoid the pickling problems, you need to build the ProtoFetcher library into a wheel file on a non-Databricks cluster (e.g. your laptop.) I personally prefer Python Poetry for managing libraries and the build process. Then upload this wheel to S3 and use is as a cluster library in Databricks.
The class itself is simply a wrapper for pb2 files:
from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType
from google.protobuf.json_format import MessageToJson
from ProtoFetcher.PROTO_OUT import example_message_pb2
 
 
# Hard-coded list of implemented messages. A single proto can contain many useful messages.
known_messages = {
    "example_message": {
        "proto_source": "example_message.proto",
        "class": example_message_pb2,
        "proto_messagename": "ExampleMessageList",
        "proto_sub_messagename": "ExampleMessageValues",
    },
}
 
 
class ProtoFetcher:
    """A class to represent a proto message. A wrapper for google.protobuf.
 
    Parameters
    ----------
    messagename : str
        the common name of the service.
 
    Examples
    --------
    >>> service = ProtoFetcher('example_message')
    >>> serde = service.get_deserializer()
    >>> 'some_field_that_is_part_of_the_proto_message' in serde.DESCRIPTOR.fields_by_name.keys()
    True
    """
 
    def __init__(self, messagename: str):
        """Constructs the proto message wrapper."""
 
        # Input check
        if messagename not in known_messages.keys():
            raise NameError(f"The service must be one of: {self.list_message_names()}")
 
        # Settings/Info for this messagename from the config
        self.msg_info = known_messages[messagename]
 
        # Initialize the message
        self.deserializer = self._init_deserializer_class()
 
    def _init_deserializer_class(self) -> GeneratedProtocolMessageType:
        """Gets the protocol message type from the Protobuf descriptors (_pb2.py file).
        It can be used for serializing and deserializing the messages.
        """
        # Get
        message = getattr(self.msg_info["class"], self.msg_info["proto_messagename"])
 
        # Some messages include a submessage. This happens at least when
        # a blob field description is a list of lists.
        if self.msg_info["proto_sub_messagename"]:
            message = getattr(message, self.msg_info["proto_sub_messagename"])
 
        return message
 
    def get_deserializer(self) -> GeneratedProtocolMessageType:
        """Getter for the message that can be used for serializing and deserializing the messages."""
        return self.deserializer
 
    def blob_to_json(self, blob) -> str:
        """Converts the bytes-like BLOB object to a the human-readable form.
 
        Parameters
        ----------
        blob : str/bytes
            bytes-like object or ASCII string including the contents of the blob field.
 
        Returns
        -------
        j : str
            Stringified JSON containing the deserialized message data.
        """
 
        # Deserialize the proto
        deserializer = self.get_deserializer()
        message = deserializer.FromString(blob)
 
        # Convert to JSON and clean it
        my_message_as_json = MessageToJson(message)
        j = json.dumps(json.loads(my_message_as_json))
        return jโ04-05-2022 03:12 AM
Your article content is extremely fascinating. I'm extremely intrigued with your post. I desire to get more incredible posts.
โ08-31-2023 04:12 PM
We've now added a native connector with parsing directly with Spark Dataframes. 
https://docs.databricks.com/en/structured-streaming/protocol-buffers.html
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf schema_registry_options = { "schema.registry.subject" : "app-events-value", "schema.registry.address" : "https://schema-registry:8081/" } # Convert binary Protobuf to SQL struct with from_protobuf(): proto_events_df = ( input_df .select( from_protobuf("proto_bytes", options = schema_registry_options) .alias("proto_event") ) ) # Convert SQL struct to binary Protobuf with to_protobuf(): protobuf_binary_df = ( proto_events_df .selectExpr("struct(name, id, context) as event") .select( to_protobuf("event", options = schema_registry_options) .alias("proto_bytes") ) )
 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now