Solution
By applying the following measures, it should operate normally:
Change org.apache.kafka.common.security.plain.PlainLoginModule to kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule.
Modify kafka.bootstrap.servers and kafka.security.protocol so that they are specified as .option.
Steps to Reproduce the Error and How to Address It
When I ran the code with your settings exactly as shown below, I got the same error:
kafkaParams = {
"kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_PLAINTEXT",
"kafka.bootstrap.servers": kafkaBootstrapServers,
"subscribe": kafkaTopic,
"group.id": kafkaGroupID,
}
df = (
spark.readStream.format("kafka")
.options(**kafkaParams)
.load()
)
display(df)
kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
After applying the measures mentioned above, the query started working correctly:
kafkaParams = {
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
"kafka.sasl.mechanism": "PLAIN",
"subscribe": kafkaTopic,
"group.id": kafkaGroupID,
}
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("kafka.security.protocol", "SASL_SSL")
.options(**kafkaParams)
.load()
)
display(df)
As an additional note, when kafka.bootstrap.servers and kafka.security.protocol were not specified via .option, a timeout error occurred:
kafkaParams = {
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_PLAINTEXT",
"kafka.bootstrap.servers": kafkaBootstrapServers,
"subscribe": kafkaTopic,
"group.id": kafkaGroupID,
}
df = (
spark.readStream.format("kafka")
.options(**kafkaParams)
.load()
)
display(df)
java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
I faced the same error myself. I hope this answer helps many others who are also encountering this issue.