cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Connecting Databricks to Heroku Kafka

Kaniz
Community Manager
Community Manager

This question is posted on behalf of @Abhijit Rai​ .

On one of the POC, we are trying to connect to Heroku Kafka from the Databricks Workspace. 

 On the Heroku Kafka, the configuration details for connection are given in the form of 4 variables as below: 

 -------Connection Variables———— 

KAFKA_CLIENT_CERT (in .pem format) 

KAFKA_CLIENT_CERT_KEY (in .pem format) 

KAFKA_TRUSTED_CERT (in .pem format) 

KAFKA_URL (kafka+ssl:// <<broker1:port>>, kafka+ssl:// <<broker2:port>>..........) 

-------Connection Variables————— 

On the Databricks side, we are trying to connect using the below scala code:-

import org.apache.spark.sql.functions.{get_json_object, json_tuple}
 
 
var streamingInputDF = 
 spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka+ssl://host1:port1,kafka+ssl://host2:port2")
  .option("kafka.security.protocol","ssl")
  .option("kafka.ssl.truststore.location", "<<truststore.jks location on dbfs>>")
  .option("kafka.ssl.truststore.password", "<<password>>")
  .option("kafka.ssl.keystore.location", "<<keystore.jks location on dbfs>>")
  .option("kafka.ssl.keystore.password", "<<password>>")
  .option("subscribe", "<<topic_name>>")   
  .option("startingOffsets", "latest") 
  .option("minPartitions", "10") 
  .option("failOnDataLoss", "true")
  .load()
 
 
display(streamingInputDF)

--Code

Error that I am getting is given below:

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:339)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)
... 46 more
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:306)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:285)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:139)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:136)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:93)
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
... 50 more
Caused by: java.nio.file.NoSuchFileException: /FileStore/tables/certificates/keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:299)
... 56 more

13 REPLIES 13

Kaniz
Community Manager
Community Manager

This question is posted on behalf of @Abhijit Rai​ .

What I have tried so far?

I have created the jks file using the blog https://blogs.mulesoft.com/dev-guides/api-connectors-templates/how-to-connect-to-apache-kafka-on-her...

I have tried executing the notebook placing the jks file on both the S3 bucket with mount point on DBFS as well as placing the certificates in the dbfs locations like - 

  1. /FileStore/tables/certificates/keystore.jks
  2. /FileStore/tables/certificates/truststore.jks

But even after placing the jks files on both storage options, I am getting the error Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS.

Kindly help in resolving this issue.

Kaniz
Community Manager
Community Manager

Hi @Abhijit Rai​ , Please feel free to add any further details.

Arai
New Contributor III

I am trying to do as per the documentation available on Databricks to connect to Kafka but unfortunately unable to connect. I am following the below link.

https://docs.databricks.com/spark/latest/structured-streaming/kafka.html#use-ssl

Kaniz
Community Manager
Community Manager

Hi @Abhijit Rai​ , Did you take a look at this?

Important

You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception:

  • group.id: Setting this parameter is not allowed for Spark versions below 2.2.

  • auto.offset.reset: Instead, set the source option startingOffsets to specify where to start. To maintain consistency, Structured Streaming (as opposed to the Kafka Consumer) manages the consumption of offsets internally. This ensures that you don’t miss any data after dynamically subscribing to new topics/partitions. startingOffsets applies only when you start a new Streaming query, and resuming from a checkpoint always picks up from where the query left off.

  • key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.

  • value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

  • enable.auto.commit: Setting this parameter is not allowed. Spark keeps track of Kafka offsets internally and doesn’t commit any offset.

  • interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.

Arai
New Contributor III

I think I haven't used any of the above parameters in my code. Should I use it?

Kaniz
Community Manager
Community Manager

Hi @Abhijit Rai​ ,

It says:- You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception. If you have not used the above parameters, good for you.

Arai
New Contributor III

Only point here @Kaniz Fatma​  is that the code error says -Unable to load the SSL key infact I have placed them at the correct location. And can read it by the below command as well.

display(dbutils.fs.ls("/FileStore/tables/"))

but even then I am unable to access them.

Kaniz
Community Manager
Community Manager

Hi @Abhijit Rai​ , Please try to prefix it with dbfs and check -

dbfs:/FileStore

or

/dbfs/FileStore

Arai
New Contributor III

It's giving the same error with both of the above options.

Arai
New Contributor III

@Kaniz Fatma​ , Any other things which I can try to resolve these issues? Also, can you tell me if there is any other recommended way to create the trust store and keystone certificates. I am assuming there maybe a problem with the way I am creating these certificates.

Kaniz
Community Manager
Community Manager

Hi @Abhijit Rai​ , Please go through the following doc.

Also, The files are automatically generated by Databricks and are stored on the filesystem of the driver node, which is not part of the DBFS.

They are in the directory: $DB_HOME/keys ($DB_HOME is an environment variable that points to the root dir for Databricks on the driver.

Kaniz
Community Manager
Community Manager

Hi @Abhijit Rai​ , Just a friendly follow-up. Do you still need help? Please let us know.

mj2022
New Contributor III

I am getting similar error when i connect to kafka using spark streaming from notebook or using spark job.

I can find the jks file in mounted dbfs location, but spark streaming complains

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore dbfs://mnt/xxxxxxx/code/kafka.client.truststore.imported.jks of type JKS
 

i also installed org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 in cluster

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.