<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question:  SASL/PLAIN authentication being used in Machine Learning</title>
    <link>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24958#M1398</link>
    <description>&lt;P&gt;@Kaniz Fatma​&amp;nbsp;I am having the same issue.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;%python
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
&amp;nbsp;
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
&amp;nbsp;
df = spark.readStream.format("kafka")
          .option("subscribe", "mytopic")
          .option("kafka.bootstrap.servers", “host:port”)
          .option("startingOffsets", "earliest")
          .option("kafka.sasl.mechanism", "PLAIN")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.username", “myuser”)
          .option("kafka.sasl.password", “mypwd”)
          .option("kafka.group.id", "TestGroup2")
          .load()
          .withColumn('key', fn.col("key").cast(StringType()))
          .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
          .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
          .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key','valueSchemaId','fixedValue'))
&amp;nbsp;
display(df)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Top of the exception stack&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:823)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:665)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:613)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 13 Apr 2022 18:00:11 GMT</pubDate>
    <dc:creator>bigdata70</dc:creator>
    <dc:date>2022-04-13T18:00:11Z</dc:date>
    <item>
      <title>In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question:  SASL/PLAIN authentication being used</title>
      <link>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24956#M1396</link>
      <description>&lt;P&gt;&lt;/P&gt;&lt;P&gt;kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:823)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:632)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:613)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)&lt;/P&gt;&lt;P&gt;	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets(KafkaOffsetReaderConsumer.scala:321)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:209)&lt;/P&gt;&lt;P&gt;	at scala.Option.getOrElse(Option.scala:189)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)&lt;/P&gt;&lt;P&gt;	at scala.Option.getOrElse(Option.scala:189)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)&lt;/P&gt;&lt;P&gt;	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)&lt;/P&gt;&lt;P&gt;	at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)&lt;/P&gt;&lt;P&gt;	at scala.collection.TraversableLike.map(TraversableLike.scala:238)&lt;/P&gt;&lt;P&gt;	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)&lt;/P&gt;&lt;P&gt;	at scala.collection.AbstractTraversable.map(Traversable.scala:108)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)&lt;/P&gt;&lt;P&gt;	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)&lt;/P&gt;&lt;P&gt;	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)&lt;/P&gt;&lt;P&gt;	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)&lt;/P&gt;&lt;P&gt;	at &lt;A href="https://org.apache.spark.sql.execution.streaming.StreamExecution.org" alt="https://org.apache.spark.sql.execution.streaming.StreamExecution.org" target="_blank"&gt;org.apache.spark.sql.execution.streaming.StreamExecution.org&lt;/A&gt;$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)&lt;/P&gt;&lt;P&gt;Caused by: kafkashaded.org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:740)&lt;/P&gt;&lt;P&gt;	... 46 more&lt;/P&gt;&lt;P&gt;Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule&lt;/P&gt;&lt;P&gt;	at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)&lt;/P&gt;&lt;P&gt;	at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)&lt;/P&gt;&lt;P&gt;	at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)&lt;/P&gt;&lt;P&gt;	at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)&lt;/P&gt;&lt;P&gt;	at java.security.AccessController.doPrivileged(Native Method)&lt;/P&gt;&lt;P&gt;	at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)&lt;/P&gt;&lt;P&gt;	at javax.security.auth.login.LoginContext.login(LoginContext.java:587)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.&amp;lt;init&amp;gt;(LoginManager.java:62)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)&lt;/P&gt;&lt;P&gt;	... 50 more&lt;/P&gt;</description>
      <pubDate>Tue, 22 Mar 2022 19:15:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24956#M1396</guid>
      <dc:creator>wchen</dc:creator>
      <dc:date>2022-03-22T19:15:22Z</dc:date>
    </item>
    <item>
      <title>Re: In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question:  SASL/PLAIN authentication being used</title>
      <link>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24958#M1398</link>
      <description>&lt;P&gt;@Kaniz Fatma​&amp;nbsp;I am having the same issue.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;%python
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
&amp;nbsp;
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
&amp;nbsp;
df = spark.readStream.format("kafka")
          .option("subscribe", "mytopic")
          .option("kafka.bootstrap.servers", “host:port”)
          .option("startingOffsets", "earliest")
          .option("kafka.sasl.mechanism", "PLAIN")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.username", “myuser”)
          .option("kafka.sasl.password", “mypwd”)
          .option("kafka.group.id", "TestGroup2")
          .load()
          .withColumn('key', fn.col("key").cast(StringType()))
          .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
          .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
          .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key','valueSchemaId','fixedValue'))
&amp;nbsp;
display(df)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Top of the exception stack&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:823)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:665)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:613)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 13 Apr 2022 18:00:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24958#M1398</guid>
      <dc:creator>bigdata70</dc:creator>
      <dc:date>2022-04-13T18:00:11Z</dc:date>
    </item>
    <item>
      <title>Re: In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question:  SASL/PLAIN authentication being used</title>
      <link>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24960#M1400</link>
      <description>&lt;P&gt;Yes @Kaniz Fatma​&amp;nbsp;. This is pretty basic snippet. So you should be able to try  &amp;amp; repro, with an Azure Event Hub / Kafka , and above snippet.&lt;/P&gt;</description>
      <pubDate>Tue, 19 Apr 2022 16:03:53 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24960#M1400</guid>
      <dc:creator>bigdata70</dc:creator>
      <dc:date>2022-04-19T16:03:53Z</dc:date>
    </item>
    <item>
      <title>Re: In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question:  SASL/PLAIN authentication being used</title>
      <link>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24961#M1401</link>
      <description>&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="image"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2000i20AE18E515F0A0FB/image-size/large?v=v2&amp;amp;px=999" role="button" title="image" alt="image" /&gt;&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 19 Apr 2022 21:17:43 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24961#M1401</guid>
      <dc:creator>bigdata70</dc:creator>
      <dc:date>2022-04-19T21:17:43Z</dc:date>
    </item>
    <item>
      <title>Re: In Databricks, the Python kafka consumer app in notebook to Confluent Cloud having the issue captured in the Body of question:  SASL/PLAIN authentication being used</title>
      <link>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24962#M1402</link>
      <description>&lt;P&gt;Okay figured the problem. So it works only if I use the .option("kafka.sasl.jaas.config", EH_SASL)  &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Doesn't work when I specify the same details as &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;option("kafka.sasl.username", “myuser”)&lt;/P&gt;&lt;P&gt;.option("kafka.sasl.password", “mypwd”)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 19 Apr 2022 21:29:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/in-databricks-the-python-kafka-consumer-app-in-notebook-to/m-p/24962#M1402</guid>
      <dc:creator>bigdata70</dc:creator>
      <dc:date>2022-04-19T21:29:02Z</dc:date>
    </item>
  </channel>
</rss>

