Spark Structured Streaming Timeout Waiting for KafkaAdminClient Node Assignment on Amazon MSK

Victor_Cruz_Mex
New Contributor III

Hello! I’m having trouble establishing a Kafka connection between my Databricks notebook and my Kafka server in Amazon MSK. I’ve run some tests and I’m really stuck—I hope someone can help me.

I have two brokers. First, I checked connectivity with:

%sh
nc -vz <broker> <port>

and got this on both:

Connection to <broker> (<ip>) <port> port [tcp/*] succeeded!

I also ran:

%sh
openssl s_client -debug -connect <broker>:<port> -tls1_2

and received:

Start Time: 1753464158
Timeout   : 7200 (sec)
Verify return code: 0 (ok)
Extended master secret: yes

Based on that, I assume there are no connectivity issues. Next, I started a streaming query with:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9096,broker2:9096") \
    .option("subscribe", "topic") \
    .option(
        "kafka.sasl.jaas.config",
        'org.apache.kafka.common.security.scram.ScramLoginModule '
        'required username="user" password="pass";'
    ) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
    .option("maxOffsetsPerTrigger", 10) \
    .option("group.id", "group_read_1") \
    .load()

display(df)

However, it just hangs while initializing the stream, and after a few minutes it fails with:

java.util.concurrent.ExecutionException: 
kafkashaded.org.apache.kafka.common.errors.TimeoutException: 
Timed out waiting for a node assignment. Call: describeTopics

Here’s the strange part: when I use the confluent_kafka library and define a consumer, I can successfully read the topic and display the offsets:

from confluent_kafka import Consumer, KafkaException
import pandas as pd
import time

config_consumer = {
    'bootstrap.servers': 'broker1:9096,broker2:9096',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'user',
    'sasl.password': 'pass',
    'group.id': 'group_1',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(config_consumer)
topic = 'topic'
consumer.subscribe([topic])

test = []
t_end = time.time() + 5
try:
    while time.time() < t_end:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        test.append({
            'topic': msg.topic(),
            'partition': msg.partition(),
            'offset': msg.offset(),
            'timestamp': msg.timestamp()[1],
            'value': msg.value().decode('utf-8')
        })
finally:
    consumer.close()

df = pd.DataFrame(test)
display(df)

I don’t know what could be wrong. Any help would be greatly appreciated!

Victor_Cruz_Mex
New Contributor III

We found the solution!, thanks to a Databricks architect here’s how we ultimately fixed it: 

1.- Copy the JVM’s cacerts into a volume so Spark can trust Amazon’s MSK certificate bundle. From a notebook cell with shell access, run:

%sh
JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))
cp \
  $JAVA_HOME/lib/security/cacerts \
  /Volumes/<catalog>/<schema>/<volume>/kafka.client.truststore.jks

2.- Switch your JAAS config to the SCRAM login module and point Spark at the new truststore. In your spark.readStream call:

df = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<broker1:9096,broker2:9096>")
         .option("subscribe", "<topic>")
         # Use the SCRAM login module for SCRAM‑SHA‑512
         .option(
             "kafka.sasl.jaas.config",
             'org.apache.kafka.common.security.scram.ScramLoginModule '
             'required username="<user>" password="<password>";'
         )
         .option("kafka.security.protocol", "SASL_SSL")
         .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
         # Truststore location on your mounted volume
         .option(
             "kafka.ssl.truststore.location",
             "/Volumes/<catalog>/<schema>/<volume>/kafka.client.truststore.jks"
         )
         # Default Java keystore password (“changeit” unless you’ve overridden it)
         .option("kafka.ssl.truststore.password", "changeit")
         .option("maxOffsetsPerTrigger", 10)
         .option("group.id", "group_read_1")
         .load()
)

display(df)

Because Spark runs inside its own VM it didn’t pick up the bundled Java certs by default. Copying the cacerts file into a volume and pointing kafka.ssl.truststore.location at it lets Spark complete the TLS handshake. Switching to the official SCRAM login module (org.apache.kafka.common.security.scram.ScramLoginModule) ensures compatibility with SCRAM‑SHA‑512. With those two changes in place, the stream connected immediately and ran without errors.

View solution in original post