cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Connecting Databricks to Heroku Kafka

Kaniz_Fatma
Community Manager
Community Manager

This question is posted on behalf of @Abhijit Raiโ€‹ .

On one of the POC, we are trying to connect to Heroku Kafka from the Databricks Workspace. 

 On the Heroku Kafka, the configuration details for connection are given in the form of 4 variables as below: 

 -------Connection Variablesโ€”โ€”โ€”โ€” 

KAFKA_CLIENT_CERT (in .pem format) 

KAFKA_CLIENT_CERT_KEY (in .pem format) 

KAFKA_TRUSTED_CERT (in .pem format) 

KAFKA_URL (kafka+ssl:// <<broker1:port>>, kafka+ssl:// <<broker2:port>>..........) 

-------Connection Variablesโ€”โ€”โ€”โ€”โ€” 

On the Databricks side, we are trying to connect using the below scala code:-

import org.apache.spark.sql.functions.{get_json_object, json_tuple}
 
 
var streamingInputDF = 
 spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka+ssl://host1:port1,kafka+ssl://host2:port2")
  .option("kafka.security.protocol","ssl")
  .option("kafka.ssl.truststore.location", "<<truststore.jks location on dbfs>>")
  .option("kafka.ssl.truststore.password", "<<password>>")
  .option("kafka.ssl.keystore.location", "<<keystore.jks location on dbfs>>")
  .option("kafka.ssl.keystore.password", "<<password>>")
  .option("subscribe", "<<topic_name>>")   
  .option("startingOffsets", "latest") 
  .option("minPartitions", "10") 
  .option("failOnDataLoss", "true")
  .load()
 
 
display(streamingInputDF)

--Codeโ€”

Error that I am getting is given below:

kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:107)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer(KafkaOffsetReaderConsumer.scala:83)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReaderConsumer.scala:561)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReaderConsumer.scala:606)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt(KafkaOffsetReaderConsumer.scala:605)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly(UninterruptibleThreadRunner.scala:50)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer(KafkaOffsetReaderConsumer.scala:559)
at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets(KafkaOffsetReaderConsumer.scala:339)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:206)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:111)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:414)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:414)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:407)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:404)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:677)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:400)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:225)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:208)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:202)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:370)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740)
... 46 more
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:306)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.<init>(DefaultSslEngineFactory.java:285)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:255)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:139)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:136)
at kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:93)
at kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:72)
... 50 more
Caused by: java.nio.file.NoSuchFileException: /FileStore/tables/certificates/keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$SecurityStore.load(DefaultSslEngineFactory.java:299)
... 56 more

13 REPLIES 13

Kaniz_Fatma
Community Manager
Community Manager

This question is posted on behalf of @Abhijit Raiโ€‹ .

What I have tried so far?

I have created the jks file using the blog https://blogs.mulesoft.com/dev-guides/api-connectors-templates/how-to-connect-to-apache-kafka-on-her...

I have tried executing the notebook placing the jks file on both the S3 bucket with mount point on DBFS as well as placing the certificates in the dbfs locations like - 

  1. /FileStore/tables/certificates/keystore.jks
  2. /FileStore/tables/certificates/truststore.jks

But even after placing the jks files on both storage options, I am getting the error โ€œFailed to load SSL keystore /FileStore/tables/certificates/keystore.jks of type JKS.

Kindly help in resolving this issue.

Kaniz_Fatma
Community Manager
Community Manager

Hi @Abhijit Raiโ€‹ , Please feel free to add any further details.

Arai
New Contributor III

I am trying to do as per the documentation available on Databricks to connect to Kafka but unfortunately unable to connect. I am following the below link.

https://docs.databricks.com/spark/latest/structured-streaming/kafka.html#use-ssl

Kaniz_Fatma
Community Manager
Community Manager

Hi @Abhijit Raiโ€‹ , Did you take a look at this?

Important

You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception:

  • group.id: Setting this parameter is not allowed for Spark versions below 2.2.

  • auto.offset.reset: Instead, set the source option startingOffsets to specify where to start. To maintain consistency, Structured Streaming (as opposed to the Kafka Consumer) manages the consumption of offsets internally. This ensures that you donโ€™t miss any data after dynamically subscribing to new topics/partitions. startingOffsets applies only when you start a new Streaming query, and resuming from a checkpoint always picks up from where the query left off.

  • key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.

  • value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

  • enable.auto.commit: Setting this parameter is not allowed. Spark keeps track of Kafka offsets internally and doesnโ€™t commit any offset.

  • interceptor.classes: Kafka source always read keys and values as byte arrays. Itโ€™s not safe to use ConsumerInterceptor as it may break the query.

Arai
New Contributor III

I think I haven't used any of the above parameters in my code. Should I use it?

Kaniz_Fatma
Community Manager
Community Manager

Hi @Abhijit Raiโ€‹ ,

It says:- You should not set the following Kafka parameters for the Kafka 0.10 connector as it will throw an exception. If you have not used the above parameters, good for you.

Arai
New Contributor III

Only point here @Kaniz Fatmaโ€‹  is that the code error says -Unable to load the SSL key infact I have placed them at the correct location. And can read it by the below command as well.

display(dbutils.fs.ls("/FileStore/tables/"))

but even then I am unable to access them.

Kaniz_Fatma
Community Manager
Community Manager

Hi @Abhijit Raiโ€‹ , Please try to prefix it with dbfs and check -

dbfs:/FileStore

or

/dbfs/FileStore

Arai
New Contributor III

It's giving the same error with both of the above options.

Arai
New Contributor III

@Kaniz Fatmaโ€‹ , Any other things which I can try to resolve these issues? Also, can you tell me if there is any other recommended way to create the trust store and keystone certificates. I am assuming there maybe a problem with the way I am creating these certificates.

Kaniz_Fatma
Community Manager
Community Manager

Hi @Abhijit Raiโ€‹ , Please go through the following doc.

Also, The files are automatically generated by Databricks and are stored on the filesystem of the driver node, which is not part of the DBFS.

They are in the directory: $DB_HOME/keys ($DB_HOME is an environment variable that points to the root dir for Databricks on the driver.

Kaniz_Fatma
Community Manager
Community Manager

Hi @Abhijit Raiโ€‹ , Just a friendly follow-up. Do you still need help? Please let us know.

mj2022
New Contributor III

I am getting similar error when i connect to kafka using spark streaming from notebook or using spark job.

I can find the jks file in mounted dbfs location, but spark streaming complains

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL keystore dbfs://mnt/xxxxxxx/code/kafka.client.truststore.imported.jks of type JKS
 

i also installed org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 in cluster

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group