cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to connect to Confluent from Databricks

Sascha
New Contributor III

I'm facing the same issue as this post: https://community.databricks.com/s/question/0D58Y00009DE82zSAD/databricks-kafka-read-not-connecting

In my case I'm connecting to Confluent Cloud. I'm able to ping the bootstrap server, I'm able to netstat succesfully on the port 9092. But when I try to consume the data using a batch consumer (or stream, that doesn't matter), the log4j server logs get flooded with the message:

22/10/04 07:39:18 WARN NetworkClient: [Consumer clientId=Databricks, groupId=new_group2] Bootstrap broker pkc-75m1o.europe-west3.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected

userid = "<API Key>"

password = "<API Secret>"

host = "pkc-75m1o.europe-west3.gcp.confluent.cloud:9092"

topic = "topic_0"

sasl_mech = "PLAIN"

inputDF = spark \

 .read \

 .format("kafka") \

 .option("kafka.bootstrap.servers", host) \

 .option("ssl.endpoint.identification.algorithm", "https") \

 .option("sasl.mechanism", sasl_mech) \

 .option("security.protocol", "SASL_SSL") \

 .option("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password)) \

 .option("subscribe", topic) \

 .option("kafka.client.id", "Databricks") \

 .option("kafka.group.id", "new_group2") \

 .option("spark.streaming.kafka.maxRatePerPartition", "5") \

 .option("startingOffsets", "earliest") \

 .option("kafka.session.timeout.ms", "10000") \

 .option("retry.backoff.ms", "1000") \

 .option("value.deserializer", "ByteArrayDeserializer") \

 .load()

display(inputDF)

1 ACCEPTED SOLUTION

Accepted Solutions

Sascha
New Contributor III

Hi @Debayan Mukherjee​ , no I haven't.

But with the help of Confluent I changed the statement to the below, and somehow this solved it.

inputDF = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", host)
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
  .option("subscribe", topic)
  .option("kafka.client.id", "Databricks")
  .option("kafka.group.id", "new_group2")
  .option("spark.streaming.kafka.maxRatePerPartition", "5")
  .option("startingOffsets", "earliest")
  .option("kafka.session.timeout.ms", "10000")
  .load() )

View solution in original post

7 REPLIES 7

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi @Sascha Zevenhuizen​ , have you enabled kerberos security on the settings by any chance?

Sascha
New Contributor III

Hi @Debayan Mukherjee​ , no I haven't.

But with the help of Confluent I changed the statement to the below, and somehow this solved it.

inputDF = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", host)
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
  .option("subscribe", topic)
  .option("kafka.client.id", "Databricks")
  .option("kafka.group.id", "new_group2")
  .option("spark.streaming.kafka.maxRatePerPartition", "5")
  .option("startingOffsets", "earliest")
  .option("kafka.session.timeout.ms", "10000")
  .load() )

Kaniz
Community Manager
Community Manager

Hi @Sascha Zevenhuizen​, Did you get a chance to read this doc that @Jose Gonzalez​ has shared in a similar thread shared by you?

Sascha
New Contributor III

Hi @Kaniz Fatma​ , that was the thread I started my trouble-shooting with, and it's surely helpful!

Kaniz
Community Manager
Community Manager

Awesome, Thank you @Sascha Zevenhuizen​ !

Keep learning!

Kaniz
Community Manager
Community Manager

Hi @Sascha Zevenhuizen​, Thank you for your response. Shall I mark your answer as the best?

Sascha
New Contributor III

Yeah ok!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.