Hello Databricks Community,
I'm working with a DLT pipeline where I consume Protobuf-serialized messages and attempt to decode them using the Spark "from_protobuf" function. My function kafka_msg_dlt_view is outlined as follows:
def kafka_msg_dlt_view():
desc_file_path = "xxxxxx"
message_name = "yyyyyyyy"
df = spark.readStream.format("kafka").options(**KAFKA_OPTIONS).load()
try:
dfr = (df.select(from_protobuf(df.value, message_name, desc_file_path).alias("msg")))
logging.info("Finished parsing")
return dfr.withColumn("p_status", lit("ok"))
except Exception as e:
logging.error(f"Got exception {e}")
return df.withColumn("p_status", lit("parsing_error"))
The challenge arises when there is a schema mismatch: the DLT pipeline fails and a Spark exception is thrown, which seems not to be caught by the Python try-except block. The error message suggests switching the mode to PERMISSIVE, but upon trying this, it appears to have no effect on the behavior of the from_protobuf functionality.
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 1cd0bea3-22d3-4567-975b-9f71efd54c64, runId = 8667b3c9-6665-47fb-8e51-cb7563e6819c] terminated with exception: Job aborted due to stage failure: Task 0 in stage 286.0 failed 4 times, most recent failure: Lost task 0.3 in stage 286.0 (TID 419) (10.0.128.12 executor 0): org.apache.spark.SparkException: [MALFORMED_PROTOBUF_MESSAGE] Malformed Protobuf messages are detected in message deserialization. Parse Mode: FAILFAST. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'.
Has anyone encountered this issue or have insight into handling schema mismatches gracefully within a DLT pipeline when using from_protobuf? Any advice on making the error handling work as intended would be greatly appreciated.
Thank you in advance for your help!