cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question: SASL/PLAIN authentication being used

wchen
New Contributor II

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)

at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets(KafkaOffsetReaderConsumer.scala:321)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:209)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)

at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)

at scala.collection.TraversableLike.map(TraversableLike.scala:238)

at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

at scala.collection.AbstractTraversable.map(Traversable.scala:108)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)

at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)

at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)

at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)

at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)

... 46 more

Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)

at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)

at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)

at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)

at javax.security.auth.login.LoginContext.login(LoginContext.java:587)

at kafkashaded.org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)

at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)

at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)

at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)

... 50 more

1 ACCEPTED SOLUTION

Accepted Solutions

bigdata70
New Contributor III

Okay figured the problem. So it works only if I use the .option("kafka.sasl.jaas.config", EH_SASL)

Doesn't work when I specify the same details as

option("kafka.sasl.username", “myuser”)

.option("kafka.sasl.password", “mypwd”)

View solution in original post

7 REPLIES 7

Kaniz
Community Manager
Community Manager

Hi @wenfei chen​ , Can you please share the code snippet?

bigdata70
New Contributor III

@Kaniz Fatma​ I am having the same issue.

%python
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
 
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
 
df = spark.readStream.format("kafka")
          .option("subscribe", "mytopic")
          .option("kafka.bootstrap.servers", “host:port”)
          .option("startingOffsets", "earliest")
          .option("kafka.sasl.mechanism", "PLAIN")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.username", “myuser”)
          .option("kafka.sasl.password", “mypwd”)
          .option("kafka.group.id", "TestGroup2")
          .load()
          .withColumn('key', fn.col("key").cast(StringType()))
          .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
          .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
          .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key','valueSchemaId','fixedValue'))
 
display(df)

Top of the exception stack

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)

at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)

Kaniz
Community Manager
Community Manager

Hi @Kannan Sundararajan​ , Did you follow the spark documentation for the same?

bigdata70
New Contributor III

Yes @Kaniz Fatma​ . This is pretty basic snippet. So you should be able to try & repro, with an Azure Event Hub / Kafka , and above snippet.

bigdata70
New Contributor III

image

bigdata70
New Contributor III

Okay figured the problem. So it works only if I use the .option("kafka.sasl.jaas.config", EH_SASL)

Doesn't work when I specify the same details as

option("kafka.sasl.username", “myuser”)

.option("kafka.sasl.password", “mypwd”)

Kaniz
Community Manager
Community Manager

Awesome @Kannan Sundararajan​ , Thank you for sharing the solution with the community. Now that this thread is solved, would you like to mark your answer as "The Best"?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.