10-07-2022 06:45 AM
I am facing an error when trying to write data on Kafka using spark stream.
#Extract
source_stream_df= (spark.readStream
.format("cosmos.oltp.changeFeed")
.option("spark.cosmos.container", PARM_CONTAINER_NAME)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("path", "/mnt/storageaccountr01")
.options(**cfg)
.load())
#Transfrom
kafka_target_df=outputDF.selectExpr("id as key","""to_json(struct(*)) as value""")
#Load
bootstrap_server="kafka-0-client.company-domin.svc.prod1-us-central1.gke.kaas-prod-us.gcp.extscloud.com:port"
topic_name="topic-downstream-data-nonprod"
keystore_loc="dbfs:/mnt/secrets/certificate/myc/keystore.jks"
keystore_pwd="**"
truststore_loc="dbfs:/mnt/secrets/certificate/myc/truststore.jks"
truststore_pwd="**"
message_df = (kafka_target_df
.writeStream
.outputMode("append")
.queryName("FlattenRecordsWriter")
.format("kafka")
.option("topic", topic_name)
.option("kafka.bootstrap.servers", bootstrap_server)
.option("security.protocol", "SSL") #SASL_SSL | SSL
.option("ssl.truststore.location", truststore_loc)
.option("ssl.truststore.password",truststore_pwd)
.option("ssl.keystore.location", keystore_loc)
.option("ssl.keystore.password", keystore_pwd)
.option("checkpointLocation", "/mnt/storageaccountr01/logs/check")
.start())
river stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
Caused by: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic umr-cps-nonprod.tic.stuff not present in metadata after 60000 ms.
10-07-2022 10:53 AM
@Abhishek Tomar : Are you able to reach the Kafka? Looks like there are network configuration issues.
Could you please double-check the routing configured including the peering connection?
Thanks
10-07-2022 10:59 AM
10-07-2022 11:11 AM
There is any other way to check the connect between my Databricks cluster and Kafka broker?
10-28-2022 03:49 PM
You can do the following:
%sh nc -zv {hostname} {port}
%sh telnet {hostname} {port}
to check connectivity
10-16-2022 03:17 AM
Please check the connection with the telnet to detect potential network issues. To run telnet, you need to use the magic command %sh.
Below is the code which I am using to connect to Kafka confluent cloud:
1. inputDF = (spark
2. .readStream
3. .format("kafka")
4. .option("kafka.bootstrap.servers", host)
5. .option("kafka.ssl.endpoint.identification.algorithm", "https")
6. .option("kafka.sasl.mechanism", "PLAIN")
7. .option("kafka.security.protocol", "SASL_SSL")
8. .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
9. .option("subscribe", topic)
10. .option("kafka.client.id", "Databricks")
11. .option("kafka.group.id", "new_group2")
12. .option("spark.streaming.kafka.maxRatePerPartition", "5")
13. .option("startingOffsets", "earliest")
14. .option("kafka.session.timeout.ms", "10000")
.load() )
10-29-2022 06:27 PM
BIU$I
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.