<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Spark Structured Streaming Timeout Waiting for KafkaAdminClient Node Assignment on Amazon MSK in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-timeout-waiting-for-kafkaadminclient/m-p/126512#M47703</link>
    <description>&lt;P&gt;Hello! I’m having trouble establishing a Kafka connection between my Databricks notebook and my Kafka server in Amazon MSK. I’ve run some tests and I’m really stuck—I hope someone can help me.&lt;/P&gt;&lt;P&gt;I have two brokers. First, I checked connectivity with:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%sh
nc -vz &amp;lt;broker&amp;gt; &amp;lt;port&amp;gt;&lt;/LI-CODE&gt;&lt;P&gt;and got this on both:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;Connection to &amp;lt;broker&amp;gt; (&amp;lt;ip&amp;gt;) &amp;lt;port&amp;gt; port [tcp/*] succeeded!&lt;/LI-CODE&gt;&lt;P&gt;I also ran:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%sh
openssl s_client -debug -connect &amp;lt;broker&amp;gt;:&amp;lt;port&amp;gt; -tls1_2&lt;/LI-CODE&gt;&lt;P&gt;and received:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;Start Time: 1753464158
Timeout   : 7200 (sec)
Verify return code: 0 (ok)
Extended master secret: yes&lt;/LI-CODE&gt;&lt;P&gt;Based on that, I assume there are no connectivity issues. Next, I started a streaming query with:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9096,broker2:9096") \
    .option("subscribe", "topic") \
    .option(
        "kafka.sasl.jaas.config",
        'org.apache.kafka.common.security.scram.ScramLoginModule '
        'required username="user" password="pass";'
    ) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
    .option("maxOffsetsPerTrigger", 10) \
    .option("group.id", "group_read_1") \
    .load()

display(df)&lt;/LI-CODE&gt;&lt;P&gt;However, it just hangs while initializing the stream, and after a few minutes it fails with:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;java.util.concurrent.ExecutionException: 
kafkashaded.org.apache.kafka.common.errors.TimeoutException: 
Timed out waiting for a node assignment. Call: describeTopics&lt;/LI-CODE&gt;&lt;P&gt;Here’s the strange part: when I use the confluent_kafka library and define a consumer, I can successfully read the topic and display the offsets:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from confluent_kafka import Consumer, KafkaException
import pandas as pd
import time

config_consumer = {
    'bootstrap.servers': 'broker1:9096,broker2:9096',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'user',
    'sasl.password': 'pass',
    'group.id': 'group_1',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(config_consumer)
topic = 'topic'
consumer.subscribe([topic])

test = []
t_end = time.time() + 5
try:
    while time.time() &amp;lt; t_end:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        test.append({
            'topic': msg.topic(),
            'partition': msg.partition(),
            'offset': msg.offset(),
            'timestamp': msg.timestamp()[1],
            'value': msg.value().decode('utf-8')
        })
finally:
    consumer.close()

df = pd.DataFrame(test)
display(df)&lt;/LI-CODE&gt;&lt;P&gt;I don’t know what could be wrong. Any help would be greatly appreciated!&lt;/P&gt;</description>
    <pubDate>Fri, 25 Jul 2025 20:13:52 GMT</pubDate>
    <dc:creator>Victor_Cruz_Mex</dc:creator>
    <dc:date>2025-07-25T20:13:52Z</dc:date>
    <item>
      <title>Spark Structured Streaming Timeout Waiting for KafkaAdminClient Node Assignment on Amazon MSK</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-timeout-waiting-for-kafkaadminclient/m-p/126512#M47703</link>
      <description>&lt;P&gt;Hello! I’m having trouble establishing a Kafka connection between my Databricks notebook and my Kafka server in Amazon MSK. I’ve run some tests and I’m really stuck—I hope someone can help me.&lt;/P&gt;&lt;P&gt;I have two brokers. First, I checked connectivity with:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%sh
nc -vz &amp;lt;broker&amp;gt; &amp;lt;port&amp;gt;&lt;/LI-CODE&gt;&lt;P&gt;and got this on both:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;Connection to &amp;lt;broker&amp;gt; (&amp;lt;ip&amp;gt;) &amp;lt;port&amp;gt; port [tcp/*] succeeded!&lt;/LI-CODE&gt;&lt;P&gt;I also ran:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%sh
openssl s_client -debug -connect &amp;lt;broker&amp;gt;:&amp;lt;port&amp;gt; -tls1_2&lt;/LI-CODE&gt;&lt;P&gt;and received:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;Start Time: 1753464158
Timeout   : 7200 (sec)
Verify return code: 0 (ok)
Extended master secret: yes&lt;/LI-CODE&gt;&lt;P&gt;Based on that, I assume there are no connectivity issues. Next, I started a streaming query with:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9096,broker2:9096") \
    .option("subscribe", "topic") \
    .option(
        "kafka.sasl.jaas.config",
        'org.apache.kafka.common.security.scram.ScramLoginModule '
        'required username="user" password="pass";'
    ) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
    .option("maxOffsetsPerTrigger", 10) \
    .option("group.id", "group_read_1") \
    .load()

display(df)&lt;/LI-CODE&gt;&lt;P&gt;However, it just hangs while initializing the stream, and after a few minutes it fails with:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;java.util.concurrent.ExecutionException: 
kafkashaded.org.apache.kafka.common.errors.TimeoutException: 
Timed out waiting for a node assignment. Call: describeTopics&lt;/LI-CODE&gt;&lt;P&gt;Here’s the strange part: when I use the confluent_kafka library and define a consumer, I can successfully read the topic and display the offsets:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from confluent_kafka import Consumer, KafkaException
import pandas as pd
import time

config_consumer = {
    'bootstrap.servers': 'broker1:9096,broker2:9096',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'user',
    'sasl.password': 'pass',
    'group.id': 'group_1',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(config_consumer)
topic = 'topic'
consumer.subscribe([topic])

test = []
t_end = time.time() + 5
try:
    while time.time() &amp;lt; t_end:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        test.append({
            'topic': msg.topic(),
            'partition': msg.partition(),
            'offset': msg.offset(),
            'timestamp': msg.timestamp()[1],
            'value': msg.value().decode('utf-8')
        })
finally:
    consumer.close()

df = pd.DataFrame(test)
display(df)&lt;/LI-CODE&gt;&lt;P&gt;I don’t know what could be wrong. Any help would be greatly appreciated!&lt;/P&gt;</description>
      <pubDate>Fri, 25 Jul 2025 20:13:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-timeout-waiting-for-kafkaadminclient/m-p/126512#M47703</guid>
      <dc:creator>Victor_Cruz_Mex</dc:creator>
      <dc:date>2025-07-25T20:13:52Z</dc:date>
    </item>
    <item>
      <title>Re: Spark Structured Streaming Timeout Waiting for KafkaAdminClient Node Assignment on Amazon MSK</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-timeout-waiting-for-kafkaadminclient/m-p/126860#M47785</link>
      <description>&lt;P&gt;&lt;FONT size="5"&gt;&lt;STRONG&gt;We found the solution!, thanks to a Databricks architect here’s how we ultimately fixed it:&amp;nbsp;&lt;/STRONG&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;1.- Copy the JVM’s cacerts into a volume&lt;/STRONG&gt; so Spark can trust Amazon’s MSK certificate bundle. From a notebook cell with shell access, run:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%sh
JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))
cp \
  $JAVA_HOME/lib/security/cacerts \
  /Volumes/&amp;lt;catalog&amp;gt;/&amp;lt;schema&amp;gt;/&amp;lt;volume&amp;gt;/kafka.client.truststore.jks&lt;/LI-CODE&gt;&lt;P&gt;&lt;STRONG&gt;2.- Switch your JAAS config to the SCRAM login module&lt;/STRONG&gt; and point Spark at the new truststore. In your spark.readStream call:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "&amp;lt;broker1:9096,broker2:9096&amp;gt;")
         .option("subscribe", "&amp;lt;topic&amp;gt;")
         # Use the SCRAM login module for SCRAM‑SHA‑512
         .option(
             "kafka.sasl.jaas.config",
             'org.apache.kafka.common.security.scram.ScramLoginModule '
             'required username="&amp;lt;user&amp;gt;" password="&amp;lt;password&amp;gt;";'
         )
         .option("kafka.security.protocol", "SASL_SSL")
         .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
         # Truststore location on your mounted volume
         .option(
             "kafka.ssl.truststore.location",
             "/Volumes/&amp;lt;catalog&amp;gt;/&amp;lt;schema&amp;gt;/&amp;lt;volume&amp;gt;/kafka.client.truststore.jks"
         )
         # Default Java keystore password (“changeit” unless you’ve overridden it)
         .option("kafka.ssl.truststore.password", "changeit")
         .option("maxOffsetsPerTrigger", 10)
         .option("group.id", "group_read_1")
         .load()
)

display(df)&lt;/LI-CODE&gt;&lt;P&gt;Because Spark runs inside its own VM it didn’t pick up the bundled Java certs by default. Copying the cacerts file into a volume and pointing kafka.ssl.truststore.location at it lets Spark complete the TLS handshake. Switching to the official SCRAM login module (org.apache.kafka.common.security.scram.ScramLoginModule) ensures compatibility with SCRAM‑SHA‑512. With those two changes in place, the stream connected immediately and ran without errors.&lt;/P&gt;</description>
      <pubDate>Tue, 29 Jul 2025 20:19:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-timeout-waiting-for-kafkaadminclient/m-p/126860#M47785</guid>
      <dc:creator>Victor_Cruz_Mex</dc:creator>
      <dc:date>2025-07-29T20:19:02Z</dc:date>
    </item>
  </channel>
</rss>

