cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Kafka timout

kwasi
New Contributor II

Hello, 
I am trying to read topics from a kafaka stream but I am getting the time out error below.

java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: describeTopics


23/09/05 18:30:52 INFO NetworkClient: [AdminClient clientId=Databricks] Disconnecting from node 4 due to socket connection setup timeout. The timeout value is 11054 ms.

I can ping the kafka broker from databricks, the error seems to occour when I try to grab data.
Example code.

 

inputDF = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format("123", "456"))
.option("subscribe", topic)
.option("spark.streaming.kafka.maxRatePerPartition", "5")
.option("startingOffsets", "earliest")
.option("kafka.session.timeout.ms", "10000")
.load() )
display(inputDF)

 

Does anyone have any inkling as to why this might be happening?

9 REPLIES 9

kwasi
New Contributor II

@Retired_mod  Thanks for the reply.
• I dont seem to have a problem with connection, running 

%sh nc -zv xxx.aws.confluent.cloud 9092 results in 
Connection to xxx.aws.confluent.cloud (xx.xx.xxx.xx) 9092 port [tcp/*] succeeded!

But the timeout happens when I try to actually retrieve data using spark, as shown in the sample code above.
i.e after, is there anything else that I am overlooking?
display(inputDF)




Tharun-Kumar
Databricks Employee
Databricks Employee

@kwasi 

As we can see from the error, the failure is happening during DescribeTopics. You can check with the Kafka team to see if the brokers are communicating fine with the controller. It is timing out while trying to communicate with the nodes. 

Getting the broker logs will help us.

Murthy1
Contributor II

@kwasi -- were you able to fix this? I am facing this issue now and any help / leads would greatly help me out 🙂

NandiniN
Databricks Employee
Databricks Employee

Hi @Murthy1 ,

Are you able to connect to Kafka from Databricks, and are the brokers healthy? The error indicates Databricks is unable to connect to Kafka cluster, possibly due to network issues or incorrect configuration.

We can try nc command from a notebook to validate the connectivity.

Thanks!

Hello @NandiniN .. Thanks for responding! Yes I am able to connect the confluent cloud (Kafka) from the notebook through nc command. I am facing the error when I try to do df.show() as same as @kwasi . Any help here will be appreciated ! 

NandiniN
Databricks Employee
Databricks Employee

Hi @Murthy1

Is this an intermittent issue or you are regularly facing this. The issue is while fetching the topic-level metadata.

I checked internally on this, it is possible it can be a network issue. We may have to do a deeper dive on this issue. 

 

devmehta
New Contributor III

What event hub namespace you were using?

I had same problem and resolved by changing pricing plan from basic to standard as Kafka apps is not supporting in basic plan

Let me know if you had anything else. Thanks

NandiniN
Databricks Employee
Databricks Employee

If all the configurations are correct for security mainly kafka.sasl.jaas.config then consider checking the Kafka broker logs for more detailed error messages that might provide further insights.

saurabh18cs
Valued Contributor II

try this :


.option('kafka.session.timeout.ms', 200000)
.option('group.max.session.timeout.ms', 7200000)

 

  • kafka.session.timeout.ms: Specifies the timeout for detecting consumer failures.
  • group.max.session.timeout.ms: Sets the maximum allowed session timeout for a consumer group.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group