Failed to create new KafkaAdminClient
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-13-2024 02:25 PM
I want to create connections to kafka with spark.readStream using the following parameters:
kafkaParams = {
"kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_PLAINTEXT",
"kafka.bootstrap.servers": kafkaBootstrapServers,
"subscribe": kafkaTopic,
"group.id": kafkaGroupID
}
I get the following error: kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
The problem is that it doesn't see: org.apache.kafka.common.security.plain.PlainLoginModule
But, if I use the library for python (confluent_kafka) everything works correctly with the same parameters inside Notebook Databricks.
Maybe someone has an idea about this?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-23-2024 01:58 AM
Solution
By applying the following measures, it should operate normally:
Change org.apache.kafka.common.security.plain.PlainLoginModule to kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule.
Modify kafka.bootstrap.servers and kafka.security.protocol so that they are specified as .option.
Steps to Reproduce the Error and How to Address It
When I ran the code with your settings exactly as shown below, I got the same error:
kafkaParams = {
"kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_PLAINTEXT",
"kafka.bootstrap.servers": kafkaBootstrapServers,
"subscribe": kafkaTopic,
"group.id": kafkaGroupID,
}
df = (
spark.readStream.format("kafka")
.options(**kafkaParams)
.load()
)
display(df)
kafkashaded.org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
After applying the measures mentioned above, the query started working correctly:
kafkaParams = {
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
"kafka.sasl.mechanism": "PLAIN",
"subscribe": kafkaTopic,
"group.id": kafkaGroupID,
}
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("kafka.security.protocol", "SASL_SSL")
.options(**kafkaParams)
.load()
)
display(df)
As an additional note, when kafka.bootstrap.servers and kafka.security.protocol were not specified via .option, a timeout error occurred:
kafkaParams = {
"kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafkaUsername}" password="{kafkaPassword}";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_PLAINTEXT",
"kafka.bootstrap.servers": kafkaBootstrapServers,
"subscribe": kafkaTopic,
"group.id": kafkaGroupID,
}
df = (
spark.readStream.format("kafka")
.options(**kafkaParams)
.load()
)
display(df)
java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
I faced the same error myself. I hope this answer helps many others who are also encountering this issue.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-23-2024 03:15 AM
The error indicates a missing Kafka client dependency for Spark in Databricks. Ensure the correct Kafka connector library is attached to your Databricks cluster, such as org.apache.spark:spark-sql-kafka-0-10_2.12:x.x.x (replace x.x.x with your Spark version). Additionally, verify that the Kafka.sasl. jars.config parameter is properly formatted and the required Kafka dependencies (e.g., Kafka clients) are compatible with the Spark version. If the issue persists, check your cluster's runtime environment for any conflicts or missing JARs.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-18-2025 02:02 AM
If you are using Confluent with Schema Registry you can use the below code. No additional libraries need to be installed. From Databricks Runtime 16.0 it support schema references and recursive references:
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
topic = "topic_name"
schema_registry_subject = f"{topic}-value"
schema_registry_url = 'schema_registry_url'
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": 'schema_registry_api_key:schema_registry_api_secret'
}
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["bootstrap.servers"]) \
.option("subscribe", topic) \
.option("startingOffsets", "earliest") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="sasl_api_key" password="sasl_api_secret";') \
.load()
df = df.select(
from_avro(
data = col("value"),
options = schema_registry_options,
subject = schema_registry_subject,
schemaRegistryAddress = schema_registry_url
).alias("value")
)
display(df)
More info here:
https://docs.databricks.com/en/structured-streaming/avro-dataframe.html#authenticate-to-an-external-...