cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark Structured Streaming Timeout Waiting for KafkaAdminClient Node Assignment on Amazon MSK

Victor_Cruz_Mex
New Contributor III

Hello! I’m having trouble establishing a Kafka connection between my Databricks notebook and my Kafka server in Amazon MSK. I’ve run some tests and I’m really stuck—I hope someone can help me.

I have two brokers. First, I checked connectivity with:

%sh
nc -vz <broker> <port>

and got this on both:

Connection to <broker> (<ip>) <port> port [tcp/*] succeeded!

I also ran:

%sh
openssl s_client -debug -connect <broker>:<port> -tls1_2

and received:

Start Time: 1753464158
Timeout   : 7200 (sec)
Verify return code: 0 (ok)
Extended master secret: yes

Based on that, I assume there are no connectivity issues. Next, I started a streaming query with:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9096,broker2:9096") \
    .option("subscribe", "topic") \
    .option(
        "kafka.sasl.jaas.config",
        'org.apache.kafka.common.security.scram.ScramLoginModule '
        'required username="user" password="pass";'
    ) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
    .option("maxOffsetsPerTrigger", 10) \
    .option("group.id", "group_read_1") \
    .load()

display(df)

However, it just hangs while initializing the stream, and after a few minutes it fails with:

java.util.concurrent.ExecutionException: 
kafkashaded.org.apache.kafka.common.errors.TimeoutException: 
Timed out waiting for a node assignment. Call: describeTopics

Here’s the strange part: when I use the confluent_kafka library and define a consumer, I can successfully read the topic and display the offsets:

from confluent_kafka import Consumer, KafkaException
import pandas as pd
import time

config_consumer = {
    'bootstrap.servers': 'broker1:9096,broker2:9096',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'user',
    'sasl.password': 'pass',
    'group.id': 'group_1',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(config_consumer)
topic = 'topic'
consumer.subscribe([topic])

test = []
t_end = time.time() + 5
try:
    while time.time() < t_end:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        test.append({
            'topic': msg.topic(),
            'partition': msg.partition(),
            'offset': msg.offset(),
            'timestamp': msg.timestamp()[1],
            'value': msg.value().decode('utf-8')
        })
finally:
    consumer.close()

df = pd.DataFrame(test)
display(df)

I don’t know what could be wrong. Any help would be greatly appreciated!

1 ACCEPTED SOLUTION

Accepted Solutions

Victor_Cruz_Mex
New Contributor III

We found the solution!, thanks to a Databricks architect here’s how we ultimately fixed it: 

1.- Copy the JVM’s cacerts into a volume so Spark can trust Amazon’s MSK certificate bundle. From a notebook cell with shell access, run:

%sh
JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))
cp \
  $JAVA_HOME/lib/security/cacerts \
  /Volumes/<catalog>/<schema>/<volume>/kafka.client.truststore.jks

2.- Switch your JAAS config to the SCRAM login module and point Spark at the new truststore. In your spark.readStream call:

df = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<broker1:9096,broker2:9096>")
         .option("subscribe", "<topic>")
         # Use the SCRAM login module for SCRAM‑SHA‑512
         .option(
             "kafka.sasl.jaas.config",
             'org.apache.kafka.common.security.scram.ScramLoginModule '
             'required username="<user>" password="<password>";'
         )
         .option("kafka.security.protocol", "SASL_SSL")
         .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
         # Truststore location on your mounted volume
         .option(
             "kafka.ssl.truststore.location",
             "/Volumes/<catalog>/<schema>/<volume>/kafka.client.truststore.jks"
         )
         # Default Java keystore password (“changeit” unless you’ve overridden it)
         .option("kafka.ssl.truststore.password", "changeit")
         .option("maxOffsetsPerTrigger", 10)
         .option("group.id", "group_read_1")
         .load()
)

display(df)

Because Spark runs inside its own VM it didn’t pick up the bundled Java certs by default. Copying the cacerts file into a volume and pointing kafka.ssl.truststore.location at it lets Spark complete the TLS handshake. Switching to the official SCRAM login module (org.apache.kafka.common.security.scram.ScramLoginModule) ensures compatibility with SCRAM‑SHA‑512. With those two changes in place, the stream connected immediately and ran without errors.

View solution in original post

1 REPLY 1

Victor_Cruz_Mex
New Contributor III

We found the solution!, thanks to a Databricks architect here’s how we ultimately fixed it: 

1.- Copy the JVM’s cacerts into a volume so Spark can trust Amazon’s MSK certificate bundle. From a notebook cell with shell access, run:

%sh
JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))
cp \
  $JAVA_HOME/lib/security/cacerts \
  /Volumes/<catalog>/<schema>/<volume>/kafka.client.truststore.jks

2.- Switch your JAAS config to the SCRAM login module and point Spark at the new truststore. In your spark.readStream call:

df = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<broker1:9096,broker2:9096>")
         .option("subscribe", "<topic>")
         # Use the SCRAM login module for SCRAM‑SHA‑512
         .option(
             "kafka.sasl.jaas.config",
             'org.apache.kafka.common.security.scram.ScramLoginModule '
             'required username="<user>" password="<password>";'
         )
         .option("kafka.security.protocol", "SASL_SSL")
         .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
         # Truststore location on your mounted volume
         .option(
             "kafka.ssl.truststore.location",
             "/Volumes/<catalog>/<schema>/<volume>/kafka.client.truststore.jks"
         )
         # Default Java keystore password (“changeit” unless you’ve overridden it)
         .option("kafka.ssl.truststore.password", "changeit")
         .option("maxOffsetsPerTrigger", 10)
         .option("group.id", "group_read_1")
         .load()
)

display(df)

Because Spark runs inside its own VM it didn’t pick up the bundled Java certs by default. Copying the cacerts file into a volume and pointing kafka.ssl.truststore.location at it lets Spark complete the TLS handshake. Switching to the official SCRAM login module (org.apache.kafka.common.security.scram.ScramLoginModule) ensures compatibility with SCRAM‑SHA‑512. With those two changes in place, the stream connected immediately and ran without errors.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now