cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling Schema Mismatch in DLT Pipeline with from_protobuf Function

serelk
New Contributor III

Hello Databricks Community,

I'm working with a DLT pipeline where I consume Protobuf-serialized messages and attempt to decode them using the Spark "from_protobuf" function. My function kafka_msg_dlt_view is outlined as follows:

def kafka_msg_dlt_view():
desc_file_path = "xxxxxx"
message_name = "yyyyyyyy"

df = spark.readStream.format("kafka").options(**KAFKA_OPTIONS).load()
try:
dfr = (df.select(from_protobuf(df.value, message_name, desc_file_path).alias("msg")))
logging.info("Finished parsing")
return dfr.withColumn("p_status", lit("ok"))
except Exception as e:
logging.error(f"Got exception {e}")
return df.withColumn("p_status", lit("parsing_error"))

The challenge arises when there is a schema mismatch: the DLT pipeline fails and a Spark exception is thrown, which seems not to be caught by the Python try-except block. The error message suggests switching the mode to PERMISSIVE, but upon trying this, it appears to have no effect on the behavior of the from_protobuf functionality.

org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 1cd0bea3-22d3-4567-975b-9f71efd54c64, runId = 8667b3c9-6665-47fb-8e51-cb7563e6819c] terminated with exception: Job aborted due to stage failure: Task 0 in stage 286.0 failed 4 times, most recent failure: Lost task 0.3 in stage 286.0 (TID 419) (10.0.128.12 executor 0): org.apache.spark.SparkException: [MALFORMED_PROTOBUF_MESSAGE] Malformed Protobuf messages are detected in message deserialization. Parse Mode: FAILFAST. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'.

Has anyone encountered this issue or have insight into handling schema mismatches gracefully within a DLT pipeline when using from_protobuf? Any advice on making the error handling work as intended would be greatly appreciated.

Thank you in advance for your help!

1 ACCEPTED SOLUTION

Accepted Solutions

serelk
New Contributor III

I finally  found the appropriate method for configuring the PERMISSIVE mode. With this setup, corrupted protobuf messages will be processed without throwing an exception

 

.withColumn("msg",from_protobuf(col("value"), message_name, desc_file_path, {"mode" : "PERMISSIVE"}) )

View solution in original post

2 REPLIES 2

Faisal
Contributor

One possible solution could be to handle the deserialization of the Protobuf messages differently. Instead of using a deserializer, you could use a ByteArrayDeserializer and convert it in your listener instead. Then, you could use a ByteArraySerializer. This approach might allow you to handle schema mismatches more gracefully.

serelk
New Contributor III

I finally  found the appropriate method for configuring the PERMISSIVE mode. With this setup, corrupted protobuf messages will be processed without throwing an exception

 

.withColumn("msg",from_protobuf(col("value"), message_name, desc_file_path, {"mode" : "PERMISSIVE"}) )

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group