10-06-2022 01:36 AM
I'm facing the same issue as this post: https://community.databricks.com/s/question/0D58Y00009DE82zSAD/databricks-kafka-read-not-connecting
In my case I'm connecting to Confluent Cloud. I'm able to ping the bootstrap server, I'm able to netstat succesfully on the port 9092. But when I try to consume the data using a batch consumer (or stream, that doesn't matter), the log4j server logs get flooded with the message:
22/10/04 07:39:18 WARN NetworkClient: [Consumer clientId=Databricks, groupId=new_group2] Bootstrap broker pkc-75m1o.europe-west3.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
userid = "<API Key>"
password = "<API Secret>"
host = "pkc-75m1o.europe-west3.gcp.confluent.cloud:9092"
topic = "topic_0"
sasl_mech = "PLAIN"
inputDF = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", host) \
.option("ssl.endpoint.identification.algorithm", "https") \
.option("sasl.mechanism", sasl_mech) \
.option("security.protocol", "SASL_SSL") \
.option("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password)) \
.option("subscribe", topic) \
.option("kafka.client.id", "Databricks") \
.option("kafka.group.id", "new_group2") \
.option("spark.streaming.kafka.maxRatePerPartition", "5") \
.option("startingOffsets", "earliest") \
.option("kafka.session.timeout.ms", "10000") \
.option("retry.backoff.ms", "1000") \
.option("value.deserializer", "ByteArrayDeserializer") \
.load()
display(inputDF)
10-07-2022 01:21 AM
Hi @Debayan Mukherjee , no I haven't.
But with the help of Confluent I changed the statement to the below, and somehow this solved it.
inputDF = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
.option("subscribe", topic)
.option("kafka.client.id", "Databricks")
.option("kafka.group.id", "new_group2")
.option("spark.streaming.kafka.maxRatePerPartition", "5")
.option("startingOffsets", "earliest")
.option("kafka.session.timeout.ms", "10000")
.load() )
10-06-2022 11:40 PM
Hi @Sascha Zevenhuizen , have you enabled kerberos security on the settings by any chance?
10-07-2022 01:21 AM
Hi @Debayan Mukherjee , no I haven't.
But with the help of Confluent I changed the statement to the below, and somehow this solved it.
inputDF = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
.option("subscribe", topic)
.option("kafka.client.id", "Databricks")
.option("kafka.group.id", "new_group2")
.option("spark.streaming.kafka.maxRatePerPartition", "5")
.option("startingOffsets", "earliest")
.option("kafka.session.timeout.ms", "10000")
.load() )
10-07-2022 09:18 AM
Hi @Sascha Zevenhuizen, Did you get a chance to read this doc that @Jose Gonzalez has shared in a similar thread shared by you?
10-15-2022 03:01 AM
Hi @Kaniz Fatma , that was the thread I started my trouble-shooting with, and it's surely helpful!
10-15-2022 03:04 AM
Awesome, Thank you @Sascha Zevenhuizen !
Keep learning!
10-15-2022 02:54 AM
Hi @Sascha Zevenhuizen, Thank you for your response. Shall I mark your answer as the best?
10-15-2022 03:02 AM
Yeah ok!
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.