<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How to test Kafka connectivity from a Databricks notebook in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-test-kafka-connectivity-from-a-databricks-notebook/m-p/20523#M13859</link>
    <description>&lt;P&gt;The below code snippet can be used to test the connectivity&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsOptions
&amp;nbsp;
val prop = new Properties()
prop.put("security.protocol","SSL");
prop.put("ssl.truststore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.truststore.password","changeit");
prop.put("ssl.keystore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.keystore.password","changeit");
&amp;nbsp;
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092")
&amp;nbsp;
val adminClient = AdminClient.create(prop)
&amp;nbsp;
val listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
println(adminClient.listTopics(listTopicsOptions).names().get());&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 24 Jun 2021 17:15:18 GMT</pubDate>
    <dc:creator>brickster_2018</dc:creator>
    <dc:date>2021-06-24T17:15:18Z</dc:date>
    <item>
      <title>How to test Kafka connectivity from a Databricks notebook</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-test-kafka-connectivity-from-a-databricks-notebook/m-p/20522#M13858</link>
      <description>&lt;P&gt;My structured streaming job is failing as it's unable to connect to Kafka. I believe the issue is with Spark. How can I isolate if it's a Spark library issue or an actual network issue. &lt;/P&gt;</description>
      <pubDate>Thu, 24 Jun 2021 17:14:39 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-test-kafka-connectivity-from-a-databricks-notebook/m-p/20522#M13858</guid>
      <dc:creator>brickster_2018</dc:creator>
      <dc:date>2021-06-24T17:14:39Z</dc:date>
    </item>
    <item>
      <title>Re: How to test Kafka connectivity from a Databricks notebook</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-test-kafka-connectivity-from-a-databricks-notebook/m-p/20523#M13859</link>
      <description>&lt;P&gt;The below code snippet can be used to test the connectivity&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsOptions
&amp;nbsp;
val prop = new Properties()
prop.put("security.protocol","SSL");
prop.put("ssl.truststore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.truststore.password","changeit");
prop.put("ssl.keystore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.keystore.password","changeit");
&amp;nbsp;
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092")
&amp;nbsp;
val adminClient = AdminClient.create(prop)
&amp;nbsp;
val listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
println(adminClient.listTopics(listTopicsOptions).names().get());&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 24 Jun 2021 17:15:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-test-kafka-connectivity-from-a-databricks-notebook/m-p/20523#M13859</guid>
      <dc:creator>brickster_2018</dc:creator>
      <dc:date>2021-06-24T17:15:18Z</dc:date>
    </item>
    <item>
      <title>Re: How to test Kafka connectivity from a Databricks notebook</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-test-kafka-connectivity-from-a-databricks-notebook/m-p/20524#M13860</link>
      <description>&lt;P&gt;Hi @Harikrishnan Kunhumveettil​&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I am also trying to connect to Heroku Kafka from the Databricks notebook, but facing challenges related to the jks files. The scala code I am using is given below:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;--Code--&lt;/P&gt;&lt;P&gt;import org.apache.spark.sql.functions.{get_json_object, json_tuple}&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;var streamingInputDF =&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;spark.readStream&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.format("kafka")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("kafka.bootstrap.servers", "kafka+ssl://host1:port1,kafka+ssl://host2:port2")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("kafka.security.protocol","ssl")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("kafka.ssl.truststore.location", "&amp;lt;&amp;lt;truststore.jks location on dbfs&amp;gt;&amp;gt;")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("kafka.ssl.truststore.password", "&amp;lt;&amp;lt;password&amp;gt;&amp;gt;")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("kafka.ssl.keystore.location", "&amp;lt;&amp;lt;keystore.jks location on dbfs&amp;gt;&amp;gt;")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("kafka.ssl.keystore.password", "&amp;lt;&amp;lt;password&amp;gt;&amp;gt;")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("subscribe", "&amp;lt;&amp;lt;topic_name&amp;gt;&amp;gt;")&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("startingOffsets", "latest")&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("minPartitions", "10")&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.option("failOnDataLoss", "true")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;.load()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;display(streamingInputDF)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;--Code--&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Error that I am getting is given below:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;---Error-----&lt;/P&gt;&lt;P&gt;kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:823)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:632)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:613)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)&lt;/P&gt;&lt;P&gt;	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:339)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:211)&lt;/P&gt;&lt;P&gt;	at scala.Option.getOrElse(Option.scala:189)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)&lt;/P&gt;&lt;P&gt;	at scala.Option.getOrElse(Option.scala:189)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)&lt;/P&gt;&lt;P&gt;	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)&lt;/P&gt;&lt;P&gt;	at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)&lt;/P&gt;&lt;P&gt;	at scala.collection.TraversableLike.map(TraversableLike.scala:238)&lt;/P&gt;&lt;P&gt;	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)&lt;/P&gt;&lt;P&gt;	at scala.collection.AbstractTraversable.map(Traversable.scala:108)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)&lt;/P&gt;&lt;P&gt;	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)&lt;/P&gt;&lt;P&gt;	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)&lt;/P&gt;&lt;P&gt;	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)&lt;/P&gt;&lt;P&gt;	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)&lt;/P&gt;&lt;P&gt;Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.&amp;lt;init&amp;gt;(KafkaConsumer.java:740)&lt;/P&gt;&lt;P&gt;	... 46 more&lt;/P&gt;&lt;P&gt;Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:306)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.&amp;lt;init&amp;gt;(DefaultSslEngineFactory.java:285)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:139)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:136)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:93)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)&lt;/P&gt;&lt;P&gt;	... 50 more&lt;/P&gt;&lt;P&gt;Caused by: java.nio.file.NoSuchFileException: /FileStore/tables/certificates/keystore.jks&lt;/P&gt;&lt;P&gt;	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)&lt;/P&gt;&lt;P&gt;	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)&lt;/P&gt;&lt;P&gt;	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)&lt;/P&gt;&lt;P&gt;	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)&lt;/P&gt;&lt;P&gt;	at java.nio.file.Files.newByteChannel(Files.java:361)&lt;/P&gt;&lt;P&gt;	at java.nio.file.Files.newByteChannel(Files.java:407)&lt;/P&gt;&lt;P&gt;	at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)&lt;/P&gt;&lt;P&gt;	at java.nio.file.Files.newInputStream(Files.java:152)&lt;/P&gt;&lt;P&gt;	at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:299)&lt;/P&gt;&lt;P&gt;	... 56 more&lt;/P&gt;&lt;P&gt;---Error-----&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Let me know if you can help me out with this issue.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks,&lt;/P&gt;&lt;P&gt;ARAI&lt;/P&gt;</description>
      <pubDate>Tue, 05 Apr 2022 08:13:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-test-kafka-connectivity-from-a-databricks-notebook/m-p/20524#M13860</guid>
      <dc:creator>Arai</dc:creator>
      <dc:date>2022-04-05T08:13:11Z</dc:date>
    </item>
  </channel>
</rss>

