cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Failed to create new KafkaAdminClient

FilipezAR
New Contributor

I want to create connections to kafka with spark.readStream using the following parameters:

 

kafkaParams = {
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_PLAINTEXT",
    "kafka.bootstrap.servers": kafkaBootstrapServers,
    "subscribe": kafkaTopic,
    "group.id": kafkaGroupID
}

 

I get the following error: kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient

The problem is that it doesn't see: org.apache.kafka.common.security.plain.PlainLoginModule

But, if I use the library for python (confluent_kafka) everything works correctly with the same parameters inside Notebook Databricks.

Maybe someone has an idea about this?

2 REPLIES 2

Manabian
New Contributor II

Solution

By applying the following measures, it should operate normally:

  • Change org.apache.kafka.common.security.plain.PlainLoginModule to kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule.

  • Modify kafka.bootstrap.servers and kafka.security.protocol so that they are specified as .option.

Steps to Reproduce the Error and How to Address It

When I ran the code with your settings exactly as shown below, I got the same error:

kafkaParams = {
   "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
   "kafka.sasl.mechanism": "PLAIN",
   "kafka.security.protocol": "SASL_PLAINTEXT",
   "kafka.bootstrap.servers": kafkaBootstrapServers,
   "subscribe": kafkaTopic,
   "group.id": kafkaGroupID,
}
df = (
   spark.readStream.format("kafka")
  .options(**kafkaParams)
  .load()
)
display(df)
kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient

Manabian_0-1734947830116.png

 

After applying the measures mentioned above, the query started working correctly:

kafkaParams = {
   "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
   "kafka.sasl.mechanism": "PLAIN",
   "subscribe": kafkaTopic,
   "group.id": kafkaGroupID,
}
df = (
   spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("kafka.security.protocol", "SASL_SSL")
  .options(**kafkaParams)
  .load()
)
display(df)

Manabian_2-1734947857818.png


As an additional note, when
kafka.bootstrap.servers and kafka.security.protocol were not specified via .option, a timeout error occurred:

kafkaParams = {
   "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
   "kafka.sasl.mechanism": "PLAIN",
   "kafka.security.protocol": "SASL_PLAINTEXT",
   "kafka.bootstrap.servers": kafkaBootstrapServers,
   "subscribe": kafkaTopic,
   "group.id": kafkaGroupID,
}
df = (
   spark.readStream.format("kafka")
  .options(**kafkaParams)
  .load()
)
display(df)
java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics

Manabian_1-1734947844207.png

 

I faced the same error myself. I hope this answer helps many others who are also encountering this issue.

john533
New Contributor III

The error indicates a missing Kafka client dependency for Spark in Databricks. Ensure the correct Kafka connector library is attached to your Databricks cluster, such as org.apache.spark:spark-sql-kafka-0-10_2.12:x.x.x (replace x.x.x with your Spark version). Additionally, verify that the Kafka.sasl. jars.config parameter is properly formatted and the required Kafka dependencies (e.g., Kafka clients) are compatible with the Spark version. If the issue persists, check your cluster's runtime environment for any conflicts or missing JARs.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group