โ04-06-2022 03:19 AM
This question is posted on behalf of @Abhijit Raiโ .
On one of the POC, we are trying to connect to Heroku Kafka from the Databricks Workspace.
On the Heroku Kafka, the configuration details for connection are given in the form of 4 variables as below:
-------Connection Variablesโโโโ
KAFKA_CLIENT_CERT (in .pem format)
KAFKA_CLIENT_CERT_KEY (in .pem format)
KAFKA_TRUSTED_CERT (in .pem format)
KAFKA_URL (kafka+ssl:// <<broker1:port>>, kafka+ssl:// <<broker2:port>>..........)
-------Connection Variablesโโโโโ
On the Databricks side, we are trying to connect using the below scala code:-
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
var streamingInputDF =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka+ssl://host1:port1,kafka+ssl://host2:port2")
.option("kafka.security.protocol","ssl")
.option("kafka.ssl.truststore.location", "<<truststore.jks location on dbfs>>")
.option("kafka.ssl.truststore.password", "<<password>>")
.option("kafka.ssl.keystore.location", "<<keystore.jks location on dbfs>>")
.option("kafka.ssl.keystore.password", "<<password>>")
.option("subscribe", "<<topic_name>>")
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
display(streamingInputDF)
--Codeโ
Error that I am getting is given below:
kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:339)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)
... 46 more
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:306)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:285)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:139)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:136)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:93)
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
... 50 more
Caused by: java.nio.file.NoSuchFileException: /FileStore/tables/certificates/keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:299)
... 56 more
โ04-06-2022 03:34 AM
This question is posted on behalf of @Abhijit Raiโ .
What I have tried so far?
I have created the jks file using the blog https://blogs.mulesoft.com/dev-guides/api-connectors-templates/how-to-connect-to-apache-kafka-on-her...
I have tried executing the notebook placing the jks file on both the S3 bucket with mount point on DBFS as well as placing the certificates in the dbfs locations like -
But even after placing the jks files on both storage options, I am getting the error โFailed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS.
Kindly help in resolving this issue.
โ04-06-2022 03:37 AM
Hi @Abhijit Raiโ , Please feel free to add any further details.
โ04-06-2022 09:29 PM
I am trying to do as per the documentation available on Databricks to connect to Kafka but unfortunately unable to connect. I am following the below link.
https://docs.databricks.com/spark/latest/structured-streaming/kafka.html#use-ssl
โ04-07-2022 12:47 AM
Hi @Abhijit Raiโ , Did you take a look at this?
Important
You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception:
โ04-07-2022 02:14 AM
I think I haven't used any of the above parameters in my code. Should I use it?
โ04-07-2022 02:18 AM
Hi @Abhijit Raiโ ,
It says:- You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception. If you have not used the above parameters, good for you.
โ04-07-2022 02:57 AM
Only point here @Kaniz Fatmaโ is that the code error says -Unable to load the SSL key infact I have placed them at the correct location. And can read it by the below command as well.
display(dbutils.fs.ls("/FileStore/tables/"))
but even then I am unable to access them.
โ04-07-2022 03:27 AM
Hi @Abhijit Raiโ , Please try to prefix it with dbfs and check -
dbfs:/FileStore
or
/dbfs/FileStore
โ04-08-2022 12:11 AM
It's giving the same error with both of the above options.
โ04-10-2022 04:28 AM
@Kaniz Fatmaโ , Any other things which I can try to resolve these issues? Also, can you tell me if there is any other recommended way to create the trust store and keystone certificates. I am assuming there maybe a problem with the way I am creating these certificates.
โ04-11-2022 02:55 AM
Hi @Abhijit Raiโ , Please go through the following doc.
Also, The files are automatically generated by Databricks and are stored on the filesystem of the driver node, which is not part of the DBFS.
They are in the directory: $DB_HOME/keys ($DB_HOME is an environment variable that points to the root dir for Databricks on the driver.
โ04-26-2022 08:38 AM
Hi @Abhijit Raiโ , Just a friendly follow-up. Do you still need help? Please let us know.
โ06-30-2022 05:42 AM
I am getting similar error when i connect to kafka using spark streaming from notebook or using spark job.
I can find the jks file in mounted dbfs location, but spark streaming complains
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore dbfs://mnt/xxxxxxx/code/kafka.client.truststore.imported.jks of type JKS
i also installed org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 in cluster
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group