09-15-2021 03:36 AM
Hi,
Let's assume I have these things:
What different approaches have Databricks users chosen to deserialize the data? Python is the programming language that I am mostly familiar with, so anything that can be achieved using pyspark would be great.
Locally, my approach would be to use the grpc_tool.protoc to generate the pb2 files (message.proto -> message_pb2.python). I would later on import these classes and use the approariate message to deserialize the binary data. Example code below:
import os
import pkg_resources
from grpc_tools import protoc
# Verbose
print(f"[Building] {protopath}")
# For some reason, grpc_tools.protoc doesn't include _proto module which causes errors:
# "google.protobuf.Timestamp" is not defined.
path_to_module = pkg_resources.resource_filename('grpc_tools', '_proto')
# Flags
args = (
'grpc_tools.protoc', # args 0 is not needed
f"--proto_path={PROTO_PATH}",
f"--python_out={OUTPUT_PATH}",
f"-I{path_to_module}",
# f"--grpc_python_out={OUTPUT_PATH}",
protopath.split("/")[-1]
)
protoc.main(args)
My current ideas are:
Is there any other way of using the deserializer with Spark? Something a bit less manual?
10-28-2021 10:56 PM
In case this helps someone else, I will leave the code here for reference that I use for running the method in Databricks inside a UDF.
from typing import Iterator
@pandas_udf("string")
def my_test_function(blobs: Iterator[pd.Series]) -> Iterator[pd.Series]:
d = ProtoFetcher("my_message_name")
for blob in blobs:
yield blob.apply(d.blob_to_json)
new_df = df.withColumn("col_as_json", my_test_function("original_col"))
For a small dataset, this performed just as the non-Pandas UDF, but I suppose this will change when I scale the dataset up.
09-29-2021 11:49 PM
@Kaniz Fatma, the community hasn't provided answers yet. Similar question has appeared in the Databricks Forums earlier and there are no ideas either. Do you have any ideas how to perform this deserialization efficiently in Databricks? Can we somehow give Spark a protobuf file/message that it would use as a serde for a given column?
09-30-2021 10:29 AM
@Jani Sourander - Following up on Kaniz's answer, we have escalated the issue to the proper team. As she said, they'll get back to you as soon as they can.
10-11-2021 04:41 PM
Hi @Jani Sourander ,
I found this library sparksql-protobuf it might work. it has not been updated in a while.
Thank you
10-11-2021 11:11 PM
Hi @Jose Gonzalez and thank you for the reply! That library seems out of date and lacks documentation. I wonder if ScalaPB would be a better option. I don't have any scala experience, but since scala UDF's perform well with Spark. I suppose learning Scala and taking this approach would be an option. There would be some other benefits too (such as writing other Databricks UDF's in Scala for performance increasement)
It is starting to seem that the best option for MVP is actually the Python-based option that is used as a wheel library in Databricks.
10-20-2021 02:53 PM
I would definitely go with a Python option here, that way you can use PandasUDFs which are faster than scala UDFs. Happy coding!
10-28-2021 06:18 AM
@Dan Zafar do you have any ideas on how would I optimize the query that requires an access to a class method? Details below.
I created a class ProtoFetcher that hosts various proto_pb2.py files create using protoc.tools (similarly as in the original post). It can be instantiated by giving a name of the data; internally it imports the correct some_proto_pb2 class, assigns it to class variables and uses the get_attr("some_proto_pb2", "name_of_this_message") to fetch the correct GeneratedProtocolMessageType (which is defined in google.protobuf.pyext.cpp_message).
Sadly, I can't seem to find a way to use the UDF so that the class initialisation wouldn't be inside the function. If I'm not completely wrong, this means that the class will get initialised for each row as a loop. Trying to access the method from outside the udf definition will raise a "PicklingError: Could not serialize object: TypeError: cannot pickle 'google.protobuf.pyext._message.MessageDescriptor' object"
@udf("string")
def my_test_func(blob):
d = ProtoFetcher("name_of_this_data")
return d.blob_to_json(blob)
new_df = df.withColumn("blob_as_json", my_test_func("original_blob_col"))
Running a display() on that new_df takes about 40 seconds, and this test file has only 600 rows. I also tried the pandas_udf approch and got the similar results. I defined the pandas_udf as:
@pandas_udf("string")
def my_test_function(s: pd.Series) -> pd.Series:
d = ProtoFetcher("name_of_this_data")
s_json = s.apply(d.blob_to_json)
return s_json
NOTE: The blob column has a fair amount of data, though. Running the same code using local Pandas installation will take 32 seconds on a fairly powerful laptop. This was being run with a code:
df['new_col'] = df['original_col'].apply(d.blob_to_json)
10-28-2021 08:32 AM
Just use an iterator of Series UDF. See here: https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html#iterator-of-series-to-iter...
This allows you to set up some pre-defined state (like loading a file) before doing the computation.
10-28-2021 10:56 PM
In case this helps someone else, I will leave the code here for reference that I use for running the method in Databricks inside a UDF.
from typing import Iterator
@pandas_udf("string")
def my_test_function(blobs: Iterator[pd.Series]) -> Iterator[pd.Series]:
d = ProtoFetcher("my_message_name")
for blob in blobs:
yield blob.apply(d.blob_to_json)
new_df = df.withColumn("col_as_json", my_test_function("original_col"))
For a small dataset, this performed just as the non-Pandas UDF, but I suppose this will change when I scale the dataset up.
10-29-2021 02:41 PM
Thanks @Jani Sourander , you can mark this as 'Best Answer' so that the question is resolved and answer is easy to find for future users.
04-01-2022 08:51 AM
@Jani Sourander would you mind posting the logic that you used in your ProtoFetched class, I am running into the same cannot pickle issue for a protobuf pb2.py file and attempted to re-create the ProtoFetched class and the my_test_function but am still receiving the error.
Thank you
04-03-2022 10:41 PM
Hi,
The ProtoFetcher is just a wrapper for the pb2 file(s) created using the protoc. As of now, I am using Scala to do the same trick (it was about 5x faster).
To avoid the pickling problems, you need to build the ProtoFetcher library into a wheel file on a non-Databricks cluster (e.g. your laptop.) I personally prefer Python Poetry for managing libraries and the build process. Then upload this wheel to S3 and use is as a cluster library in Databricks.
The class itself is simply a wrapper for pb2 files:
from google.protobuf.pyext.cpp_message import GeneratedProtocolMessageType
from google.protobuf.json_format import MessageToJson
from ProtoFetcher.PROTO_OUT import example_message_pb2
# Hard-coded list of implemented messages. A single proto can contain many useful messages.
known_messages = {
"example_message": {
"proto_source": "example_message.proto",
"class": example_message_pb2,
"proto_messagename": "ExampleMessageList",
"proto_sub_messagename": "ExampleMessageValues",
},
}
class ProtoFetcher:
"""A class to represent a proto message. A wrapper for google.protobuf.
Parameters
----------
messagename : str
the common name of the service.
Examples
--------
>>> service = ProtoFetcher('example_message')
>>> serde = service.get_deserializer()
>>> 'some_field_that_is_part_of_the_proto_message' in serde.DESCRIPTOR.fields_by_name.keys()
True
"""
def __init__(self, messagename: str):
"""Constructs the proto message wrapper."""
# Input check
if messagename not in known_messages.keys():
raise NameError(f"The service must be one of: {self.list_message_names()}")
# Settings/Info for this messagename from the config
self.msg_info = known_messages[messagename]
# Initialize the message
self.deserializer = self._init_deserializer_class()
def _init_deserializer_class(self) -> GeneratedProtocolMessageType:
"""Gets the protocol message type from the Protobuf descriptors (_pb2.py file).
It can be used for serializing and deserializing the messages.
"""
# Get
message = getattr(self.msg_info["class"], self.msg_info["proto_messagename"])
# Some messages include a submessage. This happens at least when
# a blob field description is a list of lists.
if self.msg_info["proto_sub_messagename"]:
message = getattr(message, self.msg_info["proto_sub_messagename"])
return message
def get_deserializer(self) -> GeneratedProtocolMessageType:
"""Getter for the message that can be used for serializing and deserializing the messages."""
return self.deserializer
def blob_to_json(self, blob) -> str:
"""Converts the bytes-like BLOB object to a the human-readable form.
Parameters
----------
blob : str/bytes
bytes-like object or ASCII string including the contents of the blob field.
Returns
-------
j : str
Stringified JSON containing the deserialized message data.
"""
# Deserialize the proto
deserializer = self.get_deserializer()
message = deserializer.FromString(blob)
# Convert to JSON and clean it
my_message_as_json = MessageToJson(message)
j = json.dumps(json.loads(my_message_as_json))
return j
04-05-2022 03:12 AM
Your article content is extremely fascinating. I'm extremely intrigued with your post. I desire to get more incredible posts.
08-31-2023 04:12 PM
We've now added a native connector with parsing directly with Spark Dataframes.
https://docs.databricks.com/en/structured-streaming/protocol-buffers.html
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf schema_registry_options = { "schema.registry.subject" : "app-events-value", "schema.registry.address" : "https://schema-registry:8081/" } # Convert binary Protobuf to SQL struct with from_protobuf(): proto_events_df = ( input_df .select( from_protobuf("proto_bytes", options = schema_registry_options) .alias("proto_event") ) ) # Convert SQL struct to binary Protobuf with to_protobuf(): protobuf_binary_df = ( proto_events_df .selectExpr("struct(name, id, context) as event") .select( to_protobuf("event", options = schema_registry_options) .alias("proto_bytes") ) )
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group