cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to test Kafka connectivity from a Databricks notebook

User16869510359
Esteemed Contributor

My structured streaming job is failing as it's unable to connect to Kafka. I believe the issue is with Spark. How can I isolate if it's a Spark library issue or an actual network issue.

1 ACCEPTED SOLUTION

Accepted Solutions

User16869510359
Esteemed Contributor

The below code snippet can be used to test the connectivity

import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsOptions
 
val prop = new Properties()
prop.put("security.protocol","SSL");
prop.put("ssl.truststore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.truststore.password","changeit");
prop.put("ssl.keystore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.keystore.password","changeit");
 
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092")
 
val adminClient = AdminClient.create(prop)
 
val listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
println(adminClient.listTopics(listTopicsOptions).names().get());

View solution in original post

2 REPLIES 2

User16869510359
Esteemed Contributor

The below code snippet can be used to test the connectivity

import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsOptions
 
val prop = new Properties()
prop.put("security.protocol","SSL");
prop.put("ssl.truststore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.truststore.password","changeit");
prop.put("ssl.keystore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.keystore.password","changeit");
 
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092")
 
val adminClient = AdminClient.create(prop)
 
val listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
println(adminClient.listTopics(listTopicsOptions).names().get());

Arai
New Contributor III

Hi @Harikrishnan Kunhumveettil​ ,

I am also trying to connect to Heroku Kafka from the Databricks notebook, but facing challenges related to the jks files. The scala code I am using is given below:

--Code--

import org.apache.spark.sql.functions.{get_json_object, json_tuple}

var streamingInputDF = 

 spark.readStream

  .format("kafka")

  .option("kafka.bootstrap.servers", "kafka+ssl://host1:port1,kafka+ssl://host2:port2")

  .option("kafka.security.protocol","ssl")

  .option("kafka.ssl.truststore.location", "<<truststore.jks location on dbfs>>")

  .option("kafka.ssl.truststore.password", "<<password>>")

  .option("kafka.ssl.keystore.location", "<<keystore.jks location on dbfs>>")

  .option("kafka.ssl.keystore.password", "<<password>>")

  .option("subscribe", "<<topic_name>>")   

  .option("startingOffsets", "latest") 

  .option("minPartitions", "10") 

  .option("failOnDataLoss", "true")

  .load()

display(streamingInputDF)

--Code--

Error that I am getting is given below:

---Error-----

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)

at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)

at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:339)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:211)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)

at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)

at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)

at scala.collection.TraversableLike.map(TraversableLike.scala:238)

at scala.collection.TraversableLike.map$(TraversableLike.scala:231)

at scala.collection.AbstractTraversable.map(Traversable.scala:108)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)

at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)

at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)

at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS

at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)

at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)

at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)

at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)

... 46 more

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:306)

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:285)

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:139)

at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:136)

at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:93)

at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)

... 50 more

Caused by: java.nio.file.NoSuchFileException: /FileStore/tables/certificates/keystore.jks

at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)

at java.nio.file.Files.newByteChannel(Files.java:361)

at java.nio.file.Files.newByteChannel(Files.java:407)

at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)

at java.nio.file.Files.newInputStream(Files.java:152)

at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:299)

... 56 more

---Error-----

Let me know if you can help me out with this issue.

Thanks,

ARAI

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.