cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question: SASL/PLAIN authentication being used

wchen
New Contributor II

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)

at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets(KafkaOffsetReaderConsumer.scala:321)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:209)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)

at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)

at scala.collection.TraversableLike.map(TraversableLike.scala:238)

at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

at scala.collection.AbstractTraversable.map(Traversable.scala:108)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)

at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)

at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)

at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)

at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)

... 46 more

Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)

at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)

at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)

at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)

at javax.security.auth.login.LoginContext.login(LoginContext.java:587)

at kafkashaded.org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)

at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)

at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)

at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)

... 50 more

1 ACCEPTED SOLUTION

Accepted Solutions

Okay figured the problem. So it works only if I use the .option("kafka.sasl.jaas.config", EH_SASL)

Doesn't work when I specify the same details as

option("kafka.sasl.username", “myuser”)

.option("kafka.sasl.password", “mypwd”)

View solution in original post

4 REPLIES 4

bigdata70
New Contributor III

@Kaniz Fatma​ I am having the same issue.

%python
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
 
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
 
df = spark.readStream.format("kafka")
          .option("subscribe", "mytopic")
          .option("kafka.bootstrap.servers", “host:port”)
          .option("startingOffsets", "earliest")
          .option("kafka.sasl.mechanism", "PLAIN")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.username", “myuser”)
          .option("kafka.sasl.password", “mypwd”)
          .option("kafka.group.id", "TestGroup2")
          .load()
          .withColumn('key', fn.col("key").cast(StringType()))
          .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
          .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
          .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key','valueSchemaId','fixedValue'))
 
display(df)

Top of the exception stack

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)

at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)

Yes @Kaniz Fatma​ . This is pretty basic snippet. So you should be able to try & repro, with an Azure Event Hub / Kafka , and above snippet.

image

Okay figured the problem. So it works only if I use the .option("kafka.sasl.jaas.config", EH_SASL)

Doesn't work when I specify the same details as

option("kafka.sasl.username", “myuser”)

.option("kafka.sasl.password", “mypwd”)

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group