cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

kafkashaded.org.apache.kafka.common.errors.TimeoutException: topic-downstream-data-nonprod not present in metadata after 60000 ms.

ImAbhishekTomar
New Contributor III

I am facing an error when trying to write data on Kafka using spark stream.

#Extract
source_stream_df= (spark.readStream
.format("cosmos.oltp.changeFeed")
.option("spark.cosmos.container", PARM_CONTAINER_NAME)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("path", "/mnt/storageaccountr01")
.options(**cfg)
.load())
 
#Transfrom
kafka_target_df=outputDF.selectExpr("id as key","""to_json(struct(*)) as value""")
 
#Load
bootstrap_server="kafka-0-client.company-domin.svc.prod1-us-central1.gke.kaas-prod-us.gcp.extscloud.com:port"
topic_name="topic-downstream-data-nonprod"
keystore_loc="dbfs:/mnt/secrets/certificate/myc/keystore.jks"
keystore_pwd="**"
truststore_loc="dbfs:/mnt/secrets/certificate/myc/truststore.jks"
truststore_pwd="**"
 
message_df = (kafka_target_df
    .writeStream
    .outputMode("append")
    .queryName("FlattenRecordsWriter")
    .format("kafka")
    .option("topic", topic_name)
    .option("kafka.bootstrap.servers", bootstrap_server)
    .option("security.protocol", "SSL") #SASL_SSL | SSL     
    .option("ssl.truststore.location", truststore_loc)
    .option("ssl.truststore.password",truststore_pwd)
    .option("ssl.keystore.location", keystore_loc)
    .option("ssl.keystore.password", keystore_pwd)
    .option("checkpointLocation", "/mnt/storageaccountr01/logs/check")
    .start())
 

river stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
Caused by: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic umr-cps-nonprod.tic.stuff not present in metadata after 60000 ms.

7 REPLIES 7

Sivaprasad1
Valued Contributor II

@Abhishek Tomar​ : Are you able to reach the Kafka? Looks like there are network configuration issues.

Could you please double-check the routing configured including the peering connection?

Thanks

@Sivaprasad - when I’m telnet my broker and port in Databricks then it’s showing connected.
Thanks,
Abhishek

There is any other way to check the connect between my Databricks cluster and Kafka broker?

You can do the following:

%sh nc -zv {hostname} {port}

%sh telnet {hostname} {port}

to check connectivity

Hubert-Dudek
Esteemed Contributor III

Please check the connection with the telnet to detect potential network issues. To run telnet, you need to use the magic command %sh.

Below is the code which I am using to connect to Kafka confluent cloud:

1. inputDF = (spark
2.  .readStream
3.  .format("kafka")
4.  .option("kafka.bootstrap.servers", host)
5.  .option("kafka.ssl.endpoint.identification.algorithm", "https")
6.  .option("kafka.sasl.mechanism", "PLAIN")
7.  .option("kafka.security.protocol", "SASL_SSL")
8.  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
9.  .option("subscribe", topic)
10.  .option("kafka.client.id", "Databricks")
11.  .option("kafka.group.id", "new_group2")
12.  .option("spark.streaming.kafka.maxRatePerPartition", "5")
13.  .option("startingOffsets", "earliest")
14.  .option("kafka.session.timeout.ms", "10000")
 .load() )

Zainaboladokun
New Contributor III

BIU$I

devmehta
New Contributor III

What event hub namespace you were using?

I had same problem and resolved by changing pricing plan from basic to standard as Kafka apps is not supporting in basic plan

Let me know if you had anything else. Thanks

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group