Kafka timout
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-05-2023 12:04 PM
Hello,
I am trying to read topics from a kafaka stream but I am getting the time out error below.
23/09/05 18:30:52 INFO NetworkClient: [AdminClient clientId=Databricks] Disconnecting from node 4 due to socket connection setup timeout. The timeout value is 11054 ms.
I can ping the kafka broker from databricks, the error seems to occour when I try to grab data.
Example code.
inputDF = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format("123", "456"))
.option("subscribe", topic)
.option("spark.streaming.kafka.maxRatePerPartition", "5")
.option("startingOffsets", "earliest")
.option("kafka.session.timeout.ms", "10000")
.load() )
display(inputDF)
Does anyone have any inkling as to why this might be happening?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-06-2023 06:59 AM
@Retired_mod Thanks for the reply.
• I dont seem to have a problem with connection, running
Connection to xxx.aws.confluent.cloud (xx.xx.xxx.xx) 9092 port [tcp/*] succeeded!
But the timeout happens when I try to actually retrieve data using spark, as shown in the sample code above.
i.e after, is there anything else that I am overlooking?
display(inputDF)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-06-2023 10:11 PM
As we can see from the error, the failure is happening during DescribeTopics. You can check with the Kafka team to see if the brokers are communicating fine with the controller. It is timing out while trying to communicate with the nodes.
Getting the broker logs will help us.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-01-2024 03:21 AM
@kwasi -- were you able to fix this? I am facing this issue now and any help / leads would greatly help me out 🙂
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-01-2024 03:39 AM
Hi @Murthy1 ,
Are you able to connect to Kafka from Databricks, and are the brokers healthy? The error indicates Databricks is unable to connect to Kafka cluster, possibly due to network issues or incorrect configuration.
We can try nc command from a notebook to validate the connectivity.
Thanks!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-01-2024 05:51 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-21-2024 02:54 AM
Hi @Murthy1,
Is this an intermittent issue or you are regularly facing this. The issue is while fetching the topic-level metadata.
I checked internally on this, it is possible it can be a network issue. We may have to do a deeper dive on this issue.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-10-2024 02:55 AM
What event hub namespace you were using?
I had same problem and resolved by changing pricing plan from basic to standard as Kafka apps is not supporting in basic plan
Let me know if you had anything else. Thanks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-31-2024 02:43 AM
If all the configurations are correct for security mainly kafka.sasl.jaas.config then consider checking the Kafka broker logs for more detailed error messages that might provide further insights.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-31-2024 03:44 AM
try this :
.option('kafka.session.timeout.ms', 200000)
.option('group.max.session.timeout.ms', 7200000)
- kafka.session.timeout.ms: Specifies the timeout for detecting consumer failures.
- group.max.session.timeout.ms: Sets the maximum allowed session timeout for a consumer group.