09-12-2024 05:31 AM
Greetings.
We currently have a Spark structured streaming job (Scala) retrieving avro data from an Azure Eventhub with a confluent schema registry endpoint (using an Azure Api Management gateway with certificate authentication).
Until now the .jks files used by the Databricks consumer were retrieved by mounting the storage account into the Databricks workspace while configuring the from_avro() options as follows:
val fromAvroOptions = new java.util.HashMap[String, String]()
fromAvroOptions.put("mode", "PERMISSIVE")
fromAvroOptions.put("confluent.schema.registry.ssl.truststore.location", "/dbfs/mnt/keystores/Client_Cert.truststore.jks")
fromAvroOptions.put("confluent.schema.registry.ssl.truststore.password", truststorePass)
fromAvroOptions.put("confluent.schema.registry.ssl.keystore.location", "/dbfs/mnt/keystores/Client_Cert.keystore.jks")
fromAvroOptions.put("confluent.schema.registry.ssl.keystore.password", keystorePass)
fromAvroOptions.put("confluent.schema.registry.ssl.key.password", keyPass)
We decided to migrate the storage account to Unity Catalog external volumes in order to access the .jks files (ref), which is supposed to work.
The initial handshake and authentication is achieved and a successful request is logged to the APIM logs:
However while trying to display the data the following error occurs:
The compute configuration we used is the following:
Unity Catalog enabled single user access cluster (single node 14.3 LTS)
+ com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22.
Full privileges have also be granted on the catalog/schema/volume levels on the user.
When attempting to read the data with a kafka consumer no exception is thrown, but it is logged in the log4j output while not being able to decode any messages:
Using a Shared cluster with 15.4 LTS seems to yield similar errors.
Any help would be appreciated. Thanks in advance.
Monday
Thanks for the detailed context—here’s a concise, actionable troubleshooting plan tailored to Databricks with Unity Catalog volumes and Avro + Confluent Schema Registry over APIM with mTLS.
Based on your description, the initial TLS handshake succeeds (APIM logs show a successful request), but message decoding fails in Spark/Kafka consumers, with errors surfacing only in logs on 14.3 LTS single-user and similar behavior on 15.4 LTS shared clusters. That pattern typically points to one of the following:
Avro conversion path not finding/using the keystore/truststore correctly when reading from Unity Catalog volumes, especially if the path or volume access semantics differ from DBFS mount assumptions. This often shows up after handshake success if the schema registry client can connect but cannot complete certificate chain validation or client auth for subsequent calls due to path resolution or scope issues. The Databricks doc that describes using truststore/keystore from Unity Catalog volumes could not be read via Glean, so I can’t verify exact syntax from that page right now.
Schema registry auth header or SSL options mismatch for the Confluent client inside from_avro, particularly when using APIM as a gateway. If APIM requires a client certificate, the Confluent Avro deserializer must be given the correct SSL properties, and they must be discoverable on executors.
Classpath/library version incompatibility between:
from_avro). This can lead to silent deserialization failures logged in log4j but not thrown to the application, especially with shaded/relocated dependencies.Executor access to the keystore/truststore files within UC volumes. The driver may access them, but executors might fail if paths aren’t accessible in the same way or if you’re referencing a local filesystem path that isn’t distributed.
Please try these in order; they’re low-risk and address the most common causes.
Validate the path syntax for Unity Catalog volumes and use a path that executors can read:
Prefer reading keystore/truststore into the driver/executors’ local filesystem from the UC volume before initializing from_avro, rather than pointing the Confluent client to a UC volume path directly. For example:
/local_disk/… and reference those paths in fromAvroOptions.dbutils.fs.cp("uc://catalog.schema.volume/keystores/Client_Cert.truststore.jks", "file:/local_disk/keystores/Client_Cert.truststore.jks")
Then set:
fromAvroOptions.put("confluent.schema.registry.ssl.truststore.location", "/local_disk/keystores/Client_Cert.truststore.jks")
This sidesteps any path scheme issues with UC volumes on executors.
Confirm the exact options keys used by from_avro’s Confluent client:
confluent.schema.registry.urlconfluent.schema.registry.basic.auth.credentials.source (if APIM needs headers)confluent.schema.registry.ssl.keystore.type and …truststore.type set explicitly to JKSconfluent.schema.registry.ssl.endpoint.identification.algorithm set to HTTPS or empty depending on APIM’s certs/SANs.endpoint.identification.algorithm to empty ("") to bypass hostname verification for testing; then fix certs/SANs properly.Ensure executors receive the passwords securely:
fromAvroOptions values set on the driver and broadcast to executors. Avoid relying on only driver-local variables without proper serialization.Check library alignment:
org.apache.spark:spark-avro and Confluent deserializers are compatible.spark-avro instead of adding another Avro jar unless necessary.Verify schema registry calls via APIM with mTLS outside Spark:
Inspect logs for specific errors in the executors:
io.confluent and org.apache.avro to catch exact deserialization failures (e.g., unknown schema ID, incompatible reader/writer schema, or SSL handshake on subsequent calls).If you’re using Event Hubs capture Avro vs Kafka APIs, ensure the Avro binary is compatible with from_avro:
from_avro function expects the binary payload and a valid schema resolution via the registry; confirm messages are not JSON Avro or have envelopes that need custom parsing before from_avro.This pattern avoids path scheme surprises and ensures executor access.
// Example: copy keystore/truststore from UC volume to local disk
dbutils.fs.mkdirs("file:/local_disk/keystores")
dbutils.fs.cp("uc://<catalog>.<schema>.<volume>/keystores/Client_Cert.truststore.jks", "file:/local_disk/keystores/truststore.jks")
dbutils.fs.cp("uc://<catalog>.<schema>.<volume>/keystores/Client_Cert.keystore.jks", "file:/local_disk/keystores/keystore.jks")
val fromAvroOptions = new java.util.HashMap[String, String]()
fromAvroOptions.put("mode", "PERMISSIVE")
fromAvroOptions.put("confluent.schema.registry.url", "<https://your-apim-endpoint>")
fromAvroOptions.put("confluent.schema.registry.ssl.truststore.location", "/local_disk/keystores/truststore.jks")
fromAvroOptions.put("confluent.schema.registry.ssl.truststore.password", truststorePass)
fromAvroOptions.put("confluent.schema.registry.ssl.truststore.type", "JKS")
fromAvroOptions.put("confluent.schema.registry.ssl.keystore.location", "/local_disk/keystores/keystore.jks")
fromAvroOptions.put("confluent.schema.registry.ssl.keystore.password", keystorePass)
fromAvroOptions.put("confluent.schema.registry.ssl.keystore.type", "JKS")
fromAvroOptions.put("confluent.schema.registry.ssl.key.password", keyPass)
// Optional if APIM certs/SANs are non-standard; remove once certs are fixed
// fromAvroOptions.put("confluent.schema.registry.ssl.endpoint.identification.algorithm", "")
// When creating the DF from Event Hubs payload, ensure the value column is raw binary for from_avro
import org.apache.spark.sql.functions._
val df = rawDf.select(from_avro(col("value"), lit("<schema-id-or-subject>"), fromAvroOptions).as("decoded"))
If you are using subject name strategy instead of direct schema ID, ensure the option to resolve subjects is correctly set and that APIM forwards necessary paths.
Collect and share the following for precise root cause:
from_avro (masking secrets), including URL and any additional SSL properties.from_avro.Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now