I'm trying to read a message from a confluent kafka topic using databricks but i keep getting below error. can you let me know if I'm missing something?
code
options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";',
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "BROKER:9093",
"subscribe": "sample_topic",
"kafka.group.id": '"group_id',
"startingOffsets": "earliest"
}
df = (spark
.readStream
.format("kafka")
.options(**options)
.load())
json_df = df.selectExpr("CAST(value as STRING) as json")
query = json_df.writeStream.format("console").start()
query.awaitTermination()
ERROR
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets(KafkaOffsetReaderConsumer.scala:334) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:302) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:299) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:132) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$getStartOffset$2(MicroBatchExecution.scala:566) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.getStartOffset(MicroBatchExecution.scala:566)