Hello! I’m having trouble establishing a Kafka connection between my Databricks notebook and my Kafka server in Amazon MSK. I’ve run some tests and I’m really stuck—I hope someone can help me.
I have two brokers. First, I checked connectivity with:
%sh
nc -vz <broker> <port>
and got this on both:
Connection to <broker> (<ip>) <port> port [tcp/*] succeeded!
I also ran:
%sh
openssl s_client -debug -connect <broker>:<port> -tls1_2
and received:
Start Time: 1753464158
Timeout : 7200 (sec)
Verify return code: 0 (ok)
Extended master secret: yes
Based on that, I assume there are no connectivity issues. Next, I started a streaming query with:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9096,broker2:9096") \
.option("subscribe", "topic") \
.option(
"kafka.sasl.jaas.config",
'org.apache.kafka.common.security.scram.ScramLoginModule '
'required username="user" password="pass";'
) \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("maxOffsetsPerTrigger", 10) \
.option("group.id", "group_read_1") \
.load()
display(df)
However, it just hangs while initializing the stream, and after a few minutes it fails with:
java.util.concurrent.ExecutionException:
kafkashaded.org.apache.kafka.common.errors.TimeoutException:
Timed out waiting for a node assignment. Call: describeTopics
Here’s the strange part: when I use the confluent_kafka library and define a consumer, I can successfully read the topic and display the offsets:
from confluent_kafka import Consumer, KafkaException
import pandas as pd
import time
config_consumer = {
'bootstrap.servers': 'broker1:9096,broker2:9096',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': 'user',
'sasl.password': 'pass',
'group.id': 'group_1',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(config_consumer)
topic = 'topic'
consumer.subscribe([topic])
test = []
t_end = time.time() + 5
try:
while time.time() < t_end:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
test.append({
'topic': msg.topic(),
'partition': msg.partition(),
'offset': msg.offset(),
'timestamp': msg.timestamp()[1],
'value': msg.value().decode('utf-8')
})
finally:
consumer.close()
df = pd.DataFrame(test)
display(df)
I don’t know what could be wrong. Any help would be greatly appreciated!