04-06-2022 03:19 AM
This question is posted on behalf of @Abhijit Rai .
On one of the POC, we are trying to connect to Heroku Kafka from the Databricks Workspace.
On the Heroku Kafka, the configuration details for connection are given in the form of 4 variables as below:
-------Connection Variables————
KAFKA_CLIENT_CERT (in .pem format)
KAFKA_CLIENT_CERT_KEY (in .pem format)
KAFKA_TRUSTED_CERT (in .pem format)
KAFKA_URL (kafka+ssl:// <<broker1:port>>, kafka+ssl:// <<broker2:port>>..........)
-------Connection Variables—————
On the Databricks side, we are trying to connect using the below scala code:-
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
var streamingInputDF =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka+ssl://host1:port1,kafka+ssl://host2:port2")
.option("kafka.security.protocol","ssl")
.option("kafka.ssl.truststore.location", "<<truststore.jks location on dbfs>>")
.option("kafka.ssl.truststore.password", "<<password>>")
.option("kafka.ssl.keystore.location", "<<keystore.jks location on dbfs>>")
.option("kafka.ssl.keystore.password", "<<password>>")
.option("subscribe", "<<topic_name>>")
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
display(streamingInputDF)
--Code—
Error that I am getting is given below:
kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:339)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)
... 46 more
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:306)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:285)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:139)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:136)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:93)
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
... 50 more
Caused by: java.nio.file.NoSuchFileException: /FileStore/tables/certificates/keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:299)
... 56 more
04-06-2022 03:34 AM
This question is posted on behalf of @Abhijit Rai .
What I have tried so far?
I have created the jks file using the blog https://blogs.mulesoft.com/dev-guides/api-connectors-templates/how-to-connect-to-apache-kafka-on-her...
I have tried executing the notebook placing the jks file on both the S3 bucket with mount point on DBFS as well as placing the certificates in the dbfs locations like -
But even after placing the jks files on both storage options, I am getting the error “Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS.
Kindly help in resolving this issue.
04-06-2022 03:37 AM
Hi @Abhijit Rai , Please feel free to add any further details.
04-06-2022 09:29 PM
I am trying to do as per the documentation available on Databricks to connect to Kafka but unfortunately unable to connect. I am following the below link.
https://docs.databricks.com/spark/latest/structured-streaming/kafka.html#use-ssl
04-07-2022 12:47 AM
Hi @Abhijit Rai , Did you take a look at this?
Important
You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception:
04-07-2022 02:14 AM
I think I haven't used any of the above parameters in my code. Should I use it?
04-07-2022 02:18 AM
Hi @Abhijit Rai ,
It says:- You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception. If you have not used the above parameters, good for you.
04-07-2022 02:57 AM
Only point here @Kaniz Fatma is that the code error says -Unable to load the SSL key infact I have placed them at the correct location. And can read it by the below command as well.
display(dbutils.fs.ls("/FileStore/tables/"))
but even then I am unable to access them.
04-07-2022 03:27 AM
Hi @Abhijit Rai , Please try to prefix it with dbfs and check -
dbfs:/FileStore
or
/dbfs/FileStore
04-08-2022 12:11 AM
It's giving the same error with both of the above options.
04-10-2022 04:28 AM
@Kaniz Fatma , Any other things which I can try to resolve these issues? Also, can you tell me if there is any other recommended way to create the trust store and keystone certificates. I am assuming there maybe a problem with the way I am creating these certificates.
04-11-2022 02:55 AM
Hi @Abhijit Rai , Please go through the following doc.
Also, The files are automatically generated by Databricks and are stored on the filesystem of the driver node, which is not part of the DBFS.
They are in the directory: $DB_HOME/keys ($DB_HOME is an environment variable that points to the root dir for Databricks on the driver.
04-26-2022 08:38 AM
Hi @Abhijit Rai , Just a friendly follow-up. Do you still need help? Please let us know.
06-30-2022 05:42 AM
I am getting similar error when i connect to kafka using spark streaming from notebook or using spark job.
I can find the jks file in mounted dbfs location, but spark streaming complains
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore dbfs://mnt/xxxxxxx/code/kafka.client.truststore.imported.jks of type JKS
i also installed org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 in cluster
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group