cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to test Kafka connectivity from a Databricks notebook

brickster_2018
Databricks Employee
Databricks Employee

My structured streaming job is failing as it's unable to connect to Kafka. I believe the issue is with Spark. How can I isolate if it's a Spark library issue or an actual network issue.

1 ACCEPTED SOLUTION

Accepted Solutions

brickster_2018
Databricks Employee
Databricks Employee

The below code snippet can be used to test the connectivity

import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsOptions
 
val prop = new Properties()
prop.put("security.protocol","SSL");
prop.put("ssl.truststore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.truststore.password","changeit");
prop.put("ssl.keystore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.keystore.password","changeit");
 
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092")
 
val adminClient = AdminClient.create(prop)
 
val listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
println(adminClient.listTopics(listTopicsOptions).names().get());

View solution in original post

2 REPLIES 2

brickster_2018
Databricks Employee
Databricks Employee

The below code snippet can be used to test the connectivity

import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsOptions
 
val prop = new Properties()
prop.put("security.protocol","SSL");
prop.put("ssl.truststore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.truststore.password","changeit");
prop.put("ssl.keystore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.keystore.password","changeit");
 
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092")
 
val adminClient = AdminClient.create(prop)
 
val listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
println(adminClient.listTopics(listTopicsOptions).names().get());

Arai
New Contributor III

Hi @Harikrishnan Kunhumveettilโ€‹ ,

I am also trying to connect to Heroku Kafka from the Databricks notebook, but facing challenges related to the jks files. The scala code I am using is given below:

--Code--

import org.apache.spark.sql.functions.{get_json_object, json_tuple}

var streamingInputDF = 

 spark.readStream

  .format("kafka")

  .option("kafka.bootstrap.servers", "kafka+ssl://host1:port1,kafka+ssl://host2:port2")

  .option("kafka.security.protocol","ssl")

  .option("kafka.ssl.truststore.location", "<<truststore.jks location on dbfs>>")

  .option("kafka.ssl.truststore.password", "<<password>>")

  .option("kafka.ssl.keystore.location", "<<keystore.jks location on dbfs>>")

  .option("kafka.ssl.keystore.password", "<<password>>")

  .option("subscribe", "<<topic_name>>")   

  .option("startingOffsets", "latest") 

  .option("minPartitions", "10") 

  .option("failOnDataLoss", "true")

  .load()

display(streamingInputDF)

--Code--

Error that I am getting is given below:

---Error-----

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)

at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:339)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:211)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)

at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)

at scala.collection.TraversableLike.map(TraversableLike.scala:238)

at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

at scala.collection.AbstractTraversable.map(Traversable.scala:108)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)

at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)

at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)

at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS

at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)

at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)

... 46 more

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:306)

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:285)

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:139)

at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:136)

at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:93)

at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)

... 50 more

Caused by: java.nio.file.NoSuchFileException: /FileStore/tables/certificates/keystore.jks

at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)

at java.nio.file.Files.newByteChannel(Files.java:361)

at java.nio.file.Files.newByteChannel(Files.java:407)

at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)

at java.nio.file.Files.newInputStream(Files.java:152)

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:299)

... 56 more

---Error-----

Let me know if you can help me out with this issue.

Thanks,

ARAI

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group