cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

shivank25
New Contributor II

I'm trying to read a message from a confluent kafka topic using databricks but i keep getting below error. can you let me know if I'm missing something?

code

options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "BROKER:9093",
"subscribe": "sample_topic",
"kafka.group.id": '"group_id',
"startingOffsets": "earliest"
}

df = (spark
.readStream
.format("kafka")
.options(**options)
.load())

json_df = df.selectExpr("CAST(value as STRING) as json")
query = json_df.writeStream.format("console").start()
query.awaitTermination()

 

ERROR

org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets(KafkaOffsetReaderConsumer.scala:334) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:302) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:299) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:132) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$getStartOffset$2(MicroBatchExecution.scala:566) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.getStartOffset(MicroBatchExecution.scala:566)

1 REPLY 1

Anonymous
Not applicable

The error message you're encountering suggests an issue with fetching the earliest offsets for the Kafka topic. It seems that the KafkaOffsetReaderConsumer is unable to retrieve the initial partition offsets, causing the error to occur.

To troubleshoot this issue, here are a few steps you can follow:

  1. Verify the Kafka broker details: Double-check that the Kafka broker information specified in the "kafka.bootstrap.servers" option is accurate. Ensure that the provided address and port match the actual Kafka broker you intend to connect to.

  2. Validate the security settings: If your Kafka cluster requires authentication and encryption, ensure that you have provided the necessary security configurations correctly. The options "kafka.sasl.jaas.config", "kafka.sasl.mechanism", and "kafka.security.protocol" need to be set according to your Kafka cluster's security requirements.

  3. Check the topic name and group ID: Ensure that the topic name specified in the "subscribe" option matches the Kafka topic you want to read from. Additionally, verify that the group ID specified in "kafka.group.id" is valid and unique for your application.

  4. Review access permissions: Make sure that the credentials used in the "kafka.sasl.jaas.config" have the necessary permissions to read from the Kafka topic. Check the ACLs (Access Control Lists) on the Kafka cluster to ensure that the user or service account has the appropriate read access.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group