Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-18-2025 02:02 AM
If you are using Confluent with Schema Registry you can use the below code. No additional libraries need to be installed. From Databricks Runtime 16.0 it support schema references and recursive references:
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
topic = "topic_name"
schema_registry_subject = f"{topic}-value"
schema_registry_url = 'schema_registry_url'
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": 'schema_registry_api_key:schema_registry_api_secret'
}
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["bootstrap.servers"]) \
.option("subscribe", topic) \
.option("startingOffsets", "earliest") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="sasl_api_key" password="sasl_api_secret";') \
.load()
df = df.select(
from_avro(
data = col("value"),
options = schema_registry_options,
subject = schema_registry_subject,
schemaRegistryAddress = schema_registry_url
).alias("value")
)
display(df)
More info here:
https://docs.databricks.com/en/structured-streaming/avro-dataframe.html#authenticate-to-an-external-...