10-07-2022 06:45 AM
I am facing an error when trying to write data on Kafka using spark stream.
#Extract
source_stream_df= (spark.readStream
.format("cosmos.oltp.changeFeed")
.option("spark.cosmos.container", PARM_CONTAINER_NAME)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("path", "/mnt/storageaccountr01")
.options(**cfg)
.load())
#Transfrom
kafka_target_df=outputDF.selectExpr("id as key","""to_json(struct(*)) as value""")
#Load
bootstrap_server="kafka-0-client.company-domin.svc.prod1-us-central1.gke.kaas-prod-us.gcp.extscloud.com:port"
topic_name="topic-downstream-data-nonprod"
keystore_loc="dbfs:/mnt/secrets/certificate/myc/keystore.jks"
keystore_pwd="**"
truststore_loc="dbfs:/mnt/secrets/certificate/myc/truststore.jks"
truststore_pwd="**"
message_df = (kafka_target_df
.writeStream
.outputMode("append")
.queryName("FlattenRecordsWriter")
.format("kafka")
.option("topic", topic_name)
.option("kafka.bootstrap.servers", bootstrap_server)
.option("security.protocol", "SSL") #SASL_SSL | SSL
.option("ssl.truststore.location", truststore_loc)
.option("ssl.truststore.password",truststore_pwd)
.option("ssl.keystore.location", keystore_loc)
.option("ssl.keystore.password", keystore_pwd)
.option("checkpointLocation", "/mnt/storageaccountr01/logs/check")
.start())
river stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
Caused by: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Topic umr-cps-nonprod.tic.stuff not present in metadata after 60000 ms.
10-07-2022 10:53 AM
@Abhishek Tomar : Are you able to reach the Kafka? Looks like there are network configuration issues.
Could you please double-check the routing configured including the peering connection?
Thanks
10-07-2022 10:59 AM
10-07-2022 11:11 AM
There is any other way to check the connect between my Databricks cluster and Kafka broker?
10-28-2022 03:49 PM
You can do the following:
%sh nc -zv {hostname} {port}
%sh telnet {hostname} {port}
to check connectivity
10-16-2022 03:17 AM
Please check the connection with the telnet to detect potential network issues. To run telnet, you need to use the magic command %sh.
Below is the code which I am using to connect to Kafka confluent cloud:
1. inputDF = (spark
2. .readStream
3. .format("kafka")
4. .option("kafka.bootstrap.servers", host)
5. .option("kafka.ssl.endpoint.identification.algorithm", "https")
6. .option("kafka.sasl.mechanism", "PLAIN")
7. .option("kafka.security.protocol", "SASL_SSL")
8. .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
9. .option("subscribe", topic)
10. .option("kafka.client.id", "Databricks")
11. .option("kafka.group.id", "new_group2")
12. .option("spark.streaming.kafka.maxRatePerPartition", "5")
13. .option("startingOffsets", "earliest")
14. .option("kafka.session.timeout.ms", "10000")
.load() )
10-29-2022 06:27 PM
BIU$I
09-10-2024 02:54 AM
What event hub namespace you were using?
I had same problem and resolved by changing pricing plan from basic to standard as Kafka apps is not supporting in basic plan
Let me know if you had anything else. Thanks
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group