cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling Schema Mismatch in DLT Pipeline with from_protobuf Function

serelk
New Contributor III

Hello Databricks Community,

I'm working with a DLT pipeline where I consume Protobuf-serialized messages and attempt to decode them using the Spark "from_protobuf" function. My function kafka_msg_dlt_view is outlined as follows:

def kafka_msg_dlt_view():
desc_file_path = "xxxxxx"
message_name = "yyyyyyyy"

df = spark.readStream.format("kafka").options(**KAFKA_OPTIONS).load()
try:
dfr = (df.select(from_protobuf(df.value, message_name, desc_file_path).alias("msg")))
logging.info("Finished parsing")
return dfr.withColumn("p_status", lit("ok"))
except Exception as e:
logging.error(f"Got exception {e}")
return df.withColumn("p_status", lit("parsing_error"))

The challenge arises when there is a schema mismatch: the DLT pipeline fails and a Spark exception is thrown, which seems not to be caught by the Python try-except block. The error message suggests switching the mode to PERMISSIVE, but upon trying this, it appears to have no effect on the behavior of the from_protobuf functionality.

org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 1cd0bea3-22d3-4567-975b-9f71efd54c64, runId = 8667b3c9-6665-47fb-8e51-cb7563e6819c] terminated with exception: Job aborted due to stage failure: Task 0 in stage 286.0 failed 4 times, most recent failure: Lost task 0.3 in stage 286.0 (TID 419) (10.0.128.12 executor 0): org.apache.spark.SparkException: [MALFORMED_PROTOBUF_MESSAGE] Malformed Protobuf messages are detected in message deserialization. Parse Mode: FAILFAST. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'.

Has anyone encountered this issue or have insight into handling schema mismatches gracefully within a DLT pipeline when using from_protobuf? Any advice on making the error handling work as intended would be greatly appreciated.

Thank you in advance for your help!

1 ACCEPTED SOLUTION

Accepted Solutions

serelk
New Contributor III

I finally  found the appropriate method for configuring the PERMISSIVE mode. With this setup, corrupted protobuf messages will be processed without throwing an exception

 

.withColumn("msg",from_protobuf(col("value"), message_name, desc_file_path, {"mode" : "PERMISSIVE"}) )

View solution in original post

2 REPLIES 2

Faisal
Contributor

One possible solution could be to handle the deserialization of the Protobuf messages differently. Instead of using a deserializer, you could use a ByteArrayDeserializer and convert it in your listener instead. Then, you could use a ByteArraySerializer. This approach might allow you to handle schema mismatches more gracefully.

serelk
New Contributor III

I finally  found the appropriate method for configuring the PERMISSIVE mode. With this setup, corrupted protobuf messages will be processed without throwing an exception

 

.withColumn("msg",from_protobuf(col("value"), message_name, desc_file_path, {"mode" : "PERMISSIVE"}) )
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!