Manabian
Databricks Partner

Solution

By applying the following measures, it should operate normally:

  • Change org.apache.kafka.common.security.plain.PlainLoginModule to kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule.

  • Modify kafka.bootstrap.servers and kafka.security.protocol so that they are specified as .option.

Steps to Reproduce the Error and How to Address It

When I ran the code with your settings exactly as shown below, I got the same error:

kafkaParams = {
   "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
   "kafka.sasl.mechanism": "PLAIN",
   "kafka.security.protocol": "SASL_PLAINTEXT",
   "kafka.bootstrap.servers": kafkaBootstrapServers,
   "subscribe": kafkaTopic,
   "group.id": kafkaGroupID,
}
df = (
   spark.readStream.format("kafka")
  .options(**kafkaParams)
  .load()
)
display(df)
kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient

Manabian_0-1734947830116.png

 

After applying the measures mentioned above, the query started working correctly:

kafkaParams = {
   "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
   "kafka.sasl.mechanism": "PLAIN",
   "subscribe": kafkaTopic,
   "group.id": kafkaGroupID,
}
df = (
   spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("kafka.security.protocol", "SASL_SSL")
  .options(**kafkaParams)
  .load()
)
display(df)

Manabian_2-1734947857818.png


As an additional note, when
kafka.bootstrap.servers and kafka.security.protocol were not specified via .option, a timeout error occurred:

kafkaParams = {
   "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
   "kafka.sasl.mechanism": "PLAIN",
   "kafka.security.protocol": "SASL_PLAINTEXT",
   "kafka.bootstrap.servers": kafkaBootstrapServers,
   "subscribe": kafkaTopic,
   "group.id": kafkaGroupID,
}
df = (
   spark.readStream.format("kafka")
  .options(**kafkaParams)
  .load()
)
display(df)
java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics

Manabian_1-1734947844207.png

 

I faced the same error myself. I hope this answer helps many others who are also encountering this issue.