cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

kafkashaded.org.apache.kafka.common.errors.TimeoutException: topic-downstream-data-nonprod not present in metadata after 60000 ms.

ImAbhishekTomar
New Contributor III

I am facing an error when trying to write data on Kafka using spark stream.

#Extract
source_stream_df= (spark.readStream
.format("cosmos.oltp.changeFeed")
.option("spark.cosmos.container", PARM_CONTAINER_NAME)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("path", "/mnt/storageaccountr01")
.options(**cfg)
.load())
 
#Transfrom
kafka_target_df=outputDF.selectExpr("id as key","""to_json(struct(*)) as value""")
 
#Load
bootstrap_server="kafka-0-client.company-domin.svc.prod1-us-central1.gke.kaas-prod-us.gcp.extscloud.com:port"
topic_name="topic-downstream-data-nonprod"
keystore_loc="dbfs:/mnt/secrets/certificate/myc/keystore.jks"
keystore_pwd="**"
truststore_loc="dbfs:/mnt/secrets/certificate/myc/truststore.jks"
truststore_pwd="**"
 
message_df = (kafka_target_df
    .writeStream
    .outputMode("append")
    .queryName("FlattenRecordsWriter")
    .format("kafka")
    .option("topic", topic_name)
    .option("kafka.bootstrap.servers", bootstrap_server)
    .option("security.protocol", "SSL") #SASL_SSL | SSL     
    .option("ssl.truststore.location", truststore_loc)
    .option("ssl.truststore.password",truststore_pwd)
    .option("ssl.keystore.location", keystore_loc)
    .option("ssl.keystore.password", keystore_pwd)
    .option("checkpointLocation", "/mnt/storageaccountr01/logs/check")
    .start())
 

river stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
Caused by: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic umr-cps-nonprod.tic.stuff not present in metadata after 60000 ms.

6 REPLIES 6

Sivaprasad1
Valued Contributor II

@Abhishek Tomar​ : Are you able to reach the Kafka? Looks like there are network configuration issues.

Could you please double-check the routing configured including the peering connection?

Thanks

@Sivaprasad - when I’m telnet my broker and port in Databricks then it’s showing connected.
Thanks,
Abhishek

There is any other way to check the connect between my Databricks cluster and Kafka broker?

You can do the following:

%sh nc -zv {hostname} {port}

%sh telnet {hostname} {port}

to check connectivity

Hubert-Dudek
Esteemed Contributor III

Please check the connection with the telnet to detect potential network issues. To run telnet, you need to use the magic command %sh.

Below is the code which I am using to connect to Kafka confluent cloud:

1. inputDF = (spark
2.  .readStream
3.  .format("kafka")
4.  .option("kafka.bootstrap.servers", host)
5.  .option("kafka.ssl.endpoint.identification.algorithm", "https")
6.  .option("kafka.sasl.mechanism", "PLAIN")
7.  .option("kafka.security.protocol", "SASL_SSL")
8.  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
9.  .option("subscribe", topic)
10.  .option("kafka.client.id", "Databricks")
11.  .option("kafka.group.id", "new_group2")
12.  .option("spark.streaming.kafka.maxRatePerPartition", "5")
13.  .option("startingOffsets", "earliest")
14.  .option("kafka.session.timeout.ms", "10000")
 .load() )

Zainaboladokun
New Contributor III

BIU$I

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.