kafkashaded.org.apache.kafka.common.errors.TimeoutException: topic-downstream-data-nonprod not present in metadata after 60000 ms.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-07-2022 06:45 AM
I am facing an error when trying to write data on Kafka using spark stream.
#Extract
source_stream_df= (spark.readStream
.format("cosmos.oltp.changeFeed")
.option("spark.cosmos.container", PARM_CONTAINER_NAME)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("path", "/mnt/storageaccountr01")
.options(**cfg)
.load())
#Transfrom
kafka_target_df=outputDF.selectExpr("id as key","""to_json(struct(*)) as value""")
#Load
bootstrap_server="kafka-0-client.company-domin.svc.prod1-us-central1.gke.kaas-prod-us.gcp.extscloud.com:port"
topic_name="topic-downstream-data-nonprod"
keystore_loc="dbfs:/mnt/secrets/certificate/myc/keystore.jks"
keystore_pwd="**"
truststore_loc="dbfs:/mnt/secrets/certificate/myc/truststore.jks"
truststore_pwd="**"
message_df = (kafka_target_df
.writeStream
.outputMode("append")
.queryName("FlattenRecordsWriter")
.format("kafka")
.option("topic", topic_name)
.option("kafka.bootstrap.servers", bootstrap_server)
.option("security.protocol", "SSL") #SASL_SSL | SSL
.option("ssl.truststore.location", truststore_loc)
.option("ssl.truststore.password",truststore_pwd)
.option("ssl.keystore.location", keystore_loc)
.option("ssl.keystore.password", keystore_pwd)
.option("checkpointLocation", "/mnt/storageaccountr01/logs/check")
.start())
river stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
Caused by: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic umr-cps-nonprod.tic.stuff not present in metadata after 60000 ms.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-07-2022 10:53 AM
@Abhishek Tomar : Are you able to reach the Kafka? Looks like there are network configuration issues.
Could you please double-check the routing configured including the peering connection?
Thanks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-07-2022 10:59 AM
Thanks,
Abhishek
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-07-2022 11:11 AM
There is any other way to check the connect between my Databricks cluster and Kafka broker?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-28-2022 03:49 PM
You can do the following:
%sh nc -zv {hostname} {port}
%sh telnet {hostname} {port}
to check connectivity
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-16-2022 03:17 AM
Please check the connection with the telnet to detect potential network issues. To run telnet, you need to use the magic command %sh.
Below is the code which I am using to connect to Kafka confluent cloud:
1. inputDF = (spark
2. .readStream
3. .format("kafka")
4. .option("kafka.bootstrap.servers", host)
5. .option("kafka.ssl.endpoint.identification.algorithm", "https")
6. .option("kafka.sasl.mechanism", "PLAIN")
7. .option("kafka.security.protocol", "SASL_SSL")
8. .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
9. .option("subscribe", topic)
10. .option("kafka.client.id", "Databricks")
11. .option("kafka.group.id", "new_group2")
12. .option("spark.streaming.kafka.maxRatePerPartition", "5")
13. .option("startingOffsets", "earliest")
14. .option("kafka.session.timeout.ms", "10000")
.load() )
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-29-2022 06:27 PM
BIU$I
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-10-2024 02:54 AM
What event hub namespace you were using?
I had same problem and resolved by changing pricing plan from basic to standard as Kafka apps is not supporting in basic plan
Let me know if you had anything else. Thanks