Marcin
Databricks Employee
Databricks Employee

If you are using Confluent with Schema Registry you can use the below code. No additional libraries need to be installed. From Databricks Runtime 16.0 it support schema references and recursive references:

 

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

topic = "topic_name"
schema_registry_subject = f"{topic}-value"
schema_registry_url = 'schema_registry_url'
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": 'schema_registry_api_key:schema_registry_api_secret'
}

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", config["bootstrap.servers"]) \
    .option("subscribe", topic) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="sasl_api_key" password="sasl_api_secret";') \
    .load()

df = df.select(
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = schema_registry_subject,
      schemaRegistryAddress = schema_registry_url
    ).alias("value")
  )

display(df)

 

More info here:
https://docs.databricks.com/en/structured-streaming/avro-dataframe.html#authenticate-to-an-external-...