<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Failed to create new KafkaAdminClient in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/68926#M33771</link>
    <description>&lt;P&gt;I want to create connections to kafka with spark.readStream using the following parameters:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;kafkaParams = {
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_PLAINTEXT",
    "kafka.bootstrap.servers": kafkaBootstrapServers,
    "subscribe": kafkaTopic,
    "group.id": kafkaGroupID
}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I get the following error:&lt;EM&gt; kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient&lt;/EM&gt;&lt;/P&gt;&lt;P&gt;The problem is that it doesn't see: org.apache.kafka.common.security.plain.PlainLoginModule&lt;/P&gt;&lt;P&gt;But, if I use the library for python (confluent_kafka) everything works correctly with the same parameters inside Notebook Databricks.&lt;/P&gt;&lt;P&gt;Maybe someone has an idea about this?&lt;/P&gt;</description>
    <pubDate>Mon, 13 May 2024 21:25:20 GMT</pubDate>
    <dc:creator>FilipezAR</dc:creator>
    <dc:date>2024-05-13T21:25:20Z</dc:date>
    <item>
      <title>Failed to create new KafkaAdminClient</title>
      <link>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/68926#M33771</link>
      <description>&lt;P&gt;I want to create connections to kafka with spark.readStream using the following parameters:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;kafkaParams = {
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_PLAINTEXT",
    "kafka.bootstrap.servers": kafkaBootstrapServers,
    "subscribe": kafkaTopic,
    "group.id": kafkaGroupID
}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I get the following error:&lt;EM&gt; kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient&lt;/EM&gt;&lt;/P&gt;&lt;P&gt;The problem is that it doesn't see: org.apache.kafka.common.security.plain.PlainLoginModule&lt;/P&gt;&lt;P&gt;But, if I use the library for python (confluent_kafka) everything works correctly with the same parameters inside Notebook Databricks.&lt;/P&gt;&lt;P&gt;Maybe someone has an idea about this?&lt;/P&gt;</description>
      <pubDate>Mon, 13 May 2024 21:25:20 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/68926#M33771</guid>
      <dc:creator>FilipezAR</dc:creator>
      <dc:date>2024-05-13T21:25:20Z</dc:date>
    </item>
    <item>
      <title>Re: Failed to create new KafkaAdminClient</title>
      <link>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/102991#M41291</link>
      <description>&lt;H2&gt;&lt;SPAN class=""&gt;Solution&lt;/SPAN&gt;&lt;/H2&gt;&lt;P class=""&gt;&lt;SPAN class=""&gt;By applying the following measures, it should operate normally:&lt;/SPAN&gt;&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;P class=""&gt;&lt;SPAN class=""&gt;Change &lt;/SPAN&gt;&lt;SPAN class=""&gt;org.apache.kafka.common.security.plain.PlainLoginModule&lt;/SPAN&gt;&lt;SPAN class=""&gt; to &lt;/SPAN&gt;&lt;SPAN class=""&gt;kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P class=""&gt;&lt;SPAN class=""&gt;Modify &lt;/SPAN&gt;&lt;SPAN class=""&gt;kafka.bootstrap.servers&lt;/SPAN&gt;&lt;SPAN class=""&gt; and &lt;/SPAN&gt;&lt;SPAN class=""&gt;kafka.security.protocol&lt;/SPAN&gt;&lt;SPAN class=""&gt; so that they are specified as &lt;/SPAN&gt;&lt;SPAN class=""&gt;.option&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;&lt;SPAN class=""&gt;Steps to Reproduce the Error and How to Address It&lt;/SPAN&gt;&lt;/H2&gt;&lt;P class=""&gt;&lt;SPAN class=""&gt;When I ran the code with your settings exactly as shown below, I got the same error:&lt;/SPAN&gt;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;kafkaParams&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; {&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.sasl.jaas.config"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;f'org.apache.kafka.common.security.plain.PlainLoginModule required username="&lt;/SPAN&gt;{&lt;SPAN class=""&gt;kafkaUsername&lt;/SPAN&gt;}&lt;SPAN class=""&gt;" password="&lt;/SPAN&gt;{&lt;SPAN class=""&gt;kafkaPassword&lt;/SPAN&gt;}&lt;SPAN class=""&gt;";'&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.sasl.mechanism"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;"PLAIN"&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.security.protocol"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;"SASL_PLAINTEXT"&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.bootstrap.servers"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;kafkaBootstrapServers&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"subscribe"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;kafkaTopic&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"group.id"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;kafkaGroupID&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;df&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; (&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;spark&lt;/SPAN&gt;.&lt;SPAN class=""&gt;readStream&lt;/SPAN&gt;.&lt;SPAN class=""&gt;format&lt;/SPAN&gt;(&lt;SPAN class=""&gt;"kafka"&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp;  .&lt;SPAN class=""&gt;options&lt;/SPAN&gt;(&lt;SPAN class=""&gt;**&lt;/SPAN&gt;&lt;SPAN class=""&gt;kafkaParams&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp;  .&lt;SPAN class=""&gt;load&lt;/SPAN&gt;()&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;display&lt;/SPAN&gt;(&lt;SPAN class=""&gt;df&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;/PRE&gt;&lt;PRE&gt;&lt;SPAN&gt;kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Manabian_0-1734947830116.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/13662i2B3CD47B78E298D5/image-size/medium?v=v2&amp;amp;px=400" role="button" title="Manabian_0-1734947830116.png" alt="Manabian_0-1734947830116.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&lt;SPAN class=""&gt;After applying the measures mentioned above, the query started working correctly:&lt;/SPAN&gt;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;kafkaParams&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; {&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.sasl.jaas.config"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="&lt;/SPAN&gt;{&lt;SPAN class=""&gt;kafkaUsername&lt;/SPAN&gt;}&lt;SPAN class=""&gt;" password="&lt;/SPAN&gt;{&lt;SPAN class=""&gt;kafkaPassword&lt;/SPAN&gt;}&lt;SPAN class=""&gt;";'&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.sasl.mechanism"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;"PLAIN"&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"subscribe"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;kafkaTopic&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"group.id"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;kafkaGroupID&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;df&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; (&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;spark&lt;/SPAN&gt;.&lt;SPAN class=""&gt;readStream&lt;/SPAN&gt;.&lt;SPAN class=""&gt;format&lt;/SPAN&gt;(&lt;SPAN class=""&gt;"kafka"&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp;  .&lt;SPAN class=""&gt;option&lt;/SPAN&gt;(&lt;SPAN class=""&gt;"kafka.bootstrap.servers"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;kafkaBootstrapServers&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp;  .&lt;SPAN class=""&gt;option&lt;/SPAN&gt;(&lt;SPAN class=""&gt;"kafka.security.protocol"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;"SASL_SSL"&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp;  .&lt;SPAN class=""&gt;options&lt;/SPAN&gt;(&lt;SPAN class=""&gt;**&lt;/SPAN&gt;&lt;SPAN class=""&gt;kafkaParams&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp;  .&lt;SPAN class=""&gt;load&lt;/SPAN&gt;()&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;display&lt;/SPAN&gt;(&lt;SPAN class=""&gt;df&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Manabian_2-1734947857818.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/13664i3CCB501B1A56D935/image-size/medium?v=v2&amp;amp;px=400" role="button" title="Manabian_2-1734947857818.png" alt="Manabian_2-1734947857818.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P class=""&gt;&lt;SPAN class=""&gt;&lt;BR /&gt;As an additional note, when &lt;/SPAN&gt;&lt;SPAN class=""&gt;kafka.bootstrap.servers&lt;/SPAN&gt;&lt;SPAN class=""&gt; and &lt;/SPAN&gt;&lt;SPAN class=""&gt;kafka.security.protocol&lt;/SPAN&gt;&lt;SPAN class=""&gt; were not specified via &lt;/SPAN&gt;&lt;SPAN class=""&gt;.option&lt;/SPAN&gt;&lt;SPAN class=""&gt;, a timeout error occurred:&lt;/SPAN&gt;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;kafkaParams&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; {&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.sasl.jaas.config"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="&lt;/SPAN&gt;{&lt;SPAN class=""&gt;kafkaUsername&lt;/SPAN&gt;}&lt;SPAN class=""&gt;" password="&lt;/SPAN&gt;{&lt;SPAN class=""&gt;kafkaPassword&lt;/SPAN&gt;}&lt;SPAN class=""&gt;";'&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.sasl.mechanism"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;"PLAIN"&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.security.protocol"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;"SASL_PLAINTEXT"&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"kafka.bootstrap.servers"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;kafkaBootstrapServers&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"subscribe"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;kafkaTopic&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;"group.id"&lt;/SPAN&gt;: &lt;SPAN class=""&gt;kafkaGroupID&lt;/SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;df&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; (&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp; &amp;nbsp;&lt;SPAN class=""&gt;spark&lt;/SPAN&gt;.&lt;SPAN class=""&gt;readStream&lt;/SPAN&gt;.&lt;SPAN class=""&gt;format&lt;/SPAN&gt;(&lt;SPAN class=""&gt;"kafka"&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp;  .&lt;SPAN class=""&gt;options&lt;/SPAN&gt;(&lt;SPAN class=""&gt;**&lt;/SPAN&gt;&lt;SPAN class=""&gt;kafkaParams&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt; &amp;nbsp;  .&lt;SPAN class=""&gt;load&lt;/SPAN&gt;()&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&lt;SPAN class=""&gt;display&lt;/SPAN&gt;(&lt;SPAN class=""&gt;df&lt;/SPAN&gt;)&lt;/SPAN&gt;&lt;/PRE&gt;&lt;PRE&gt;&lt;SPAN&gt;java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Manabian_1-1734947844207.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/13663iE48BA55A2368A180/image-size/medium?v=v2&amp;amp;px=400" role="button" title="Manabian_1-1734947844207.png" alt="Manabian_1-1734947844207.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN class=""&gt;I faced the same error myself. I hope this answer helps many others who are also encountering this issue.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 23 Dec 2024 09:58:36 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/102991#M41291</guid>
      <dc:creator>Manabian</dc:creator>
      <dc:date>2024-12-23T09:58:36Z</dc:date>
    </item>
    <item>
      <title>Re: Failed to create new KafkaAdminClient</title>
      <link>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/102996#M41293</link>
      <description>&lt;P&gt;The error indicates a missing Kafka client dependency for Spark in Databricks. Ensure the correct Kafka connector library is attached to your Databricks cluster, such as &lt;STRONG&gt;org.apache.spark:spark-sql-kafka-0-10_2.12:x.x.x&lt;/STRONG&gt; (replace x.x.x with your Spark version). Additionally, verify that the Kafka.sasl. jars.config parameter is properly formatted and the required Kafka dependencies (e.g., Kafka clients) are compatible with the Spark version. If the issue persists, check your cluster's runtime environment for any conflicts or missing JARs.&lt;/P&gt;</description>
      <pubDate>Mon, 23 Dec 2024 11:15:29 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/102996#M41293</guid>
      <dc:creator>john533</dc:creator>
      <dc:date>2024-12-23T11:15:29Z</dc:date>
    </item>
    <item>
      <title>Re: Failed to create new KafkaAdminClient</title>
      <link>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/110466#M43582</link>
      <description>&lt;P&gt;If you are using Confluent with Schema Registry you can use the below code. No additional libraries need to be installed. From Databricks Runtime 16.0 it support schema references and recursive references:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

topic = "topic_name"
schema_registry_subject = f"{topic}-value"
schema_registry_url = 'schema_registry_url'
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": 'schema_registry_api_key:schema_registry_api_secret'
}

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", config["bootstrap.servers"]) \
    .option("subscribe", topic) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="sasl_api_key" password="sasl_api_secret";') \
    .load()

df = df.select(
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = schema_registry_subject,
      schemaRegistryAddress = schema_registry_url
    ).alias("value")
  )

display(df)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;More info here:&lt;BR /&gt;&lt;A href="https://docs.databricks.com/en/structured-streaming/avro-dataframe.html#authenticate-to-an-external-confluent-schema-registry" target="_blank"&gt;https://docs.databricks.com/en/structured-streaming/avro-dataframe.html#authenticate-to-an-external-confluent-schema-registry&lt;/A&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 18 Feb 2025 10:02:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/failed-to-create-new-kafkaadminclient/m-p/110466#M43582</guid>
      <dc:creator>Marcin</dc:creator>
      <dc:date>2025-02-18T10:02:52Z</dc:date>
    </item>
  </channel>
</rss>

