โ06-24-2021 10:14 AM
My structured streaming job is failing as it's unable to connect to Kafka. I believe the issue is with Spark. How can I isolate if it's a Spark library issue or an actual network issue.
โ06-24-2021 10:15 AM
The below code snippet can be used to test the connectivity
import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsOptions
val prop = new Properties()
prop.put("security.protocol","SSL");
prop.put("ssl.truststore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.truststore.password","changeit");
prop.put("ssl.keystore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.keystore.password","changeit");
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092")
val adminClient = AdminClient.create(prop)
val listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
println(adminClient.listTopics(listTopicsOptions).names().get());
โ06-24-2021 10:15 AM
The below code snippet can be used to test the connectivity
import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.ListTopicsOptions
val prop = new Properties()
prop.put("security.protocol","SSL");
prop.put("ssl.truststore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.truststore.password","changeit");
prop.put("ssl.keystore.location", "/dbfs/keys/custom.keystore" );
prop.put("ssl.keystore.password","changeit");
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092")
val adminClient = AdminClient.create(prop)
val listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
println(adminClient.listTopics(listTopicsOptions).names().get());
โ04-05-2022 01:13 AM
Hi @Harikrishnan Kunhumveettilโ ,
I am also trying to connect to Heroku Kafka from the Databricks notebook, but facing challenges related to the jks files. The scala code I am using is given below:
--Code--
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
var streamingInputDF =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka+ssl://host1:port1,kafka+ssl://host2:port2")
.option("kafka.security.protocol","ssl")
.option("kafka.ssl.truststore.location", "<<truststore.jks location on dbfs>>")
.option("kafka.ssl.truststore.password", "<<password>>")
.option("kafka.ssl.keystore.location", "<<keystore.jks location on dbfs>>")
.option("kafka.ssl.keystore.password", "<<password>>")
.option("subscribe", "<<topic_name>>")
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
display(streamingInputDF)
--Code--
Error that I am getting is given below:
---Error-----
kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:339)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)
... 46 more
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:306)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:285)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:139)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:136)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:93)
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
... 50 more
Caused by: java.nio.file.NoSuchFileException: /FileStore/tables/certificates/keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:299)
... 56 more
---Error-----
Let me know if you can help me out with this issue.
Thanks,
ARAI
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group