cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

kafkashaded.org.apache.kafka.common.errors.TimeoutException: topic-downstream-data-nonprod not present in metadata after 60000 ms.

ImAbhishekTomar
New Contributor III

I am facing an error when trying to write data on Kafka using spark stream.

#Extract
source_stream_df= (spark.readStream
.format("cosmos.oltp.changeFeed")
.option("spark.cosmos.container", PARM_CONTAINER_NAME)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("path", "/mnt/storageaccountr01")
.options(**cfg)
.load())
 
#Transfrom
kafka_target_df=outputDF.selectExpr("id as key","""to_json(struct(*)) as value""")
 
#Load
bootstrap_server="kafka-0-client.company-domin.svc.prod1-us-central1.gke.kaas-prod-us.gcp.extscloud.com:port"
topic_name="topic-downstream-data-nonprod"
keystore_loc="dbfs:/mnt/secrets/certificate/myc/keystore.jks"
keystore_pwd="**"
truststore_loc="dbfs:/mnt/secrets/certificate/myc/truststore.jks"
truststore_pwd="**"
 
message_df = (kafka_target_df
    .writeStream
    .outputMode("append")
    .queryName("FlattenRecordsWriter")
    .format("kafka")
    .option("topic", topic_name)
    .option("kafka.bootstrap.servers", bootstrap_server)
    .option("security.protocol", "SSL") #SASL_SSL | SSL     
    .option("ssl.truststore.location", truststore_loc)
    .option("ssl.truststore.password",truststore_pwd)
    .option("ssl.keystore.location", keystore_loc)
    .option("ssl.keystore.password", keystore_pwd)
    .option("checkpointLocation", "/mnt/storageaccountr01/logs/check")
    .start())
 

river stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
Caused by: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic umr-cps-nonprod.tic.stuff not present in metadata after 60000 ms.

6 REPLIES 6

Sivaprasad1
Valued Contributor II

@Abhishek Tomar​ : Are you able to reach the Kafka? Looks like there are network configuration issues.

Could you please double-check the routing configured including the peering connection?

Thanks

@Sivaprasad - when I’m telnet my broker and port in Databricks then it’s showing connected.
Thanks,
Abhishek

There is any other way to check the connect between my Databricks cluster and Kafka broker?

You can do the following:

%sh nc -zv {hostname} {port}

%sh telnet {hostname} {port}

to check connectivity

Hubert-Dudek
Esteemed Contributor III

Please check the connection with the telnet to detect potential network issues. To run telnet, you need to use the magic command %sh.

Below is the code which I am using to connect to Kafka confluent cloud:

1. inputDF = (spark
2.  .readStream
3.  .format("kafka")
4.  .option("kafka.bootstrap.servers", host)
5.  .option("kafka.ssl.endpoint.identification.algorithm", "https")
6.  .option("kafka.sasl.mechanism", "PLAIN")
7.  .option("kafka.security.protocol", "SASL_SSL")
8.  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
9.  .option("subscribe", topic)
10.  .option("kafka.client.id", "Databricks")
11.  .option("kafka.group.id", "new_group2")
12.  .option("spark.streaming.kafka.maxRatePerPartition", "5")
13.  .option("startingOffsets", "earliest")
14.  .option("kafka.session.timeout.ms", "10000")
 .load() )

Zainaboladokun
New Contributor III

BIU$I

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!