cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

kafkashaded.org.apache.kafka.common.errors.TimeoutException: topic-downstream-data-nonprod not present in metadata after 60000 ms.

ImAbhishekTomar
New Contributor III

I am facing an error when trying to write data on Kafka using spark stream.

#Extract
source_stream_df= (spark.readStream
.format("cosmos.oltp.changeFeed")
.option("spark.cosmos.container", PARM_CONTAINER_NAME)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("path", "/mnt/storageaccountr01")
.options(**cfg)
.load())
 
#Transfrom
kafka_target_df=outputDF.selectExpr("id as key","""to_json(struct(*)) as value""")
 
#Load
bootstrap_server="kafka-0-client.company-domin.svc.prod1-us-central1.gke.kaas-prod-us.gcp.extscloud.com:port"
topic_name="topic-downstream-data-nonprod"
keystore_loc="dbfs:/mnt/secrets/certificate/myc/keystore.jks"
keystore_pwd="**"
truststore_loc="dbfs:/mnt/secrets/certificate/myc/truststore.jks"
truststore_pwd="**"
 
message_df = (kafka_target_df
    .writeStream
    .outputMode("append")
    .queryName("FlattenRecordsWriter")
    .format("kafka")
    .option("topic", topic_name)
    .option("kafka.bootstrap.servers", bootstrap_server)
    .option("security.protocol", "SSL") #SASL_SSL | SSL     
    .option("ssl.truststore.location", truststore_loc)
    .option("ssl.truststore.password",truststore_pwd)
    .option("ssl.keystore.location", keystore_loc)
    .option("ssl.keystore.password", keystore_pwd)
    .option("checkpointLocation", "/mnt/storageaccountr01/logs/check")
    .start())
 

river stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
Caused by: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic umr-cps-nonprod.tic.stuff not present in metadata after 60000 ms.

6 REPLIES 6

Sivaprasad1
Valued Contributor II

@Abhishek Tomarโ€‹ : Are you able to reach the Kafka? Looks like there are network configuration issues.

Could you please double-check the routing configured including the peering connection?

Thanks

@Sivaprasad - when Iโ€™m telnet my broker and port in Databricks then itโ€™s showing connected.
Thanks,
Abhishek

There is any other way to check the connect between my Databricks cluster and Kafka broker?

You can do the following:

%sh nc -zv {hostname} {port}

%sh telnet {hostname} {port}

to check connectivity

Hubert-Dudek
Esteemed Contributor III

Please check the connection with the telnet to detect potential network issues. To run telnet, you need to use the magic command %sh.

Below is the code which I am using to connect to Kafka confluent cloud:

1. inputDF = (spark
2.  .readStream
3.  .format("kafka")
4.  .option("kafka.bootstrap.servers", host)
5.  .option("kafka.ssl.endpoint.identification.algorithm", "https")
6.  .option("kafka.sasl.mechanism", "PLAIN")
7.  .option("kafka.security.protocol", "SASL_SSL")
8.  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
9.  .option("subscribe", topic)
10.  .option("kafka.client.id", "Databricks")
11.  .option("kafka.group.id", "new_group2")
12.  .option("spark.streaming.kafka.maxRatePerPartition", "5")
13.  .option("startingOffsets", "earliest")
14.  .option("kafka.session.timeout.ms", "10000")
 .load() )

Zainaboladokun
New Contributor III

BIU$I

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.