cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question: SASL/PLAIN authentication being used

wchen
New Contributor II

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)

at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets(KafkaOffsetReaderConsumer.scala:321)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:209)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)

at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)

at scala.collection.TraversableLike.map(TraversableLike.scala:238)

at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

at scala.collection.AbstractTraversable.map(Traversable.scala:108)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)

at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)

at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)

at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)

at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)

... 46 more

Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)

at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)

at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)

at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)

at javax.security.auth.login.LoginContext.login(LoginContext.java:587)

at kafkashaded.org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)

at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)

at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)

at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)

... 50 more

1 ACCEPTED SOLUTION

Accepted Solutions

Okay figured the problem. So it works only if I use the .option("kafka.sasl.jaas.config", EH_SASL)

Doesn't work when I specify the same details as

option("kafka.sasl.username", “myuser”)

.option("kafka.sasl.password", “mypwd”)

View solution in original post

4 REPLIES 4

bigdata70
New Contributor III

@Kaniz Fatma​ I am having the same issue.

%python
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
 
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
 
df = spark.readStream.format("kafka")
          .option("subscribe", "mytopic")
          .option("kafka.bootstrap.servers", “host:port”)
          .option("startingOffsets", "earliest")
          .option("kafka.sasl.mechanism", "PLAIN")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.username", “myuser”)
          .option("kafka.sasl.password", “mypwd”)
          .option("kafka.group.id", "TestGroup2")
          .load()
          .withColumn('key', fn.col("key").cast(StringType()))
          .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
          .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
          .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key','valueSchemaId','fixedValue'))
 
display(df)

Top of the exception stack

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)

at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)

Yes @Kaniz Fatma​ . This is pretty basic snippet. So you should be able to try & repro, with an Azure Event Hub / Kafka , and above snippet.

image

Okay figured the problem. So it works only if I use the .option("kafka.sasl.jaas.config", EH_SASL)

Doesn't work when I specify the same details as

option("kafka.sasl.username", “myuser”)

.option("kafka.sasl.password", “mypwd”)

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now