01-05-2023 10:51 AM
Hello I am a newbie in this field and trying to access confluent kafka stream in Databricks Azure based on a beginner's video by Databricks. I have a free trial of Databricks cluster right now. When I run the below notebook, it errors out on line 5 on scope.
My question is, should I create the scope in confluent or in Databricks. Has anybody seen this before?
Thanks so much!!
Julie
01-05-2023 10:53 AM
For testing, create without secret scope. It will be unsafe, but you can post secrets as strings in the notebook for testing. Here is the code which I used for loading data from confluent:
inputDF = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
.option("subscribe", topic)
.option("kafka.client.id", "Databricks")
.option("kafka.group.id", "new_group2")
.option("spark.streaming.kafka.maxRatePerPartition", "5")
.option("startingOffsets", "earliest")
.option("kafka.session.timeout.ms", "10000")
.option("minPartitions", sc.DefaultParallelism)
.load() )
01-05-2023 10:53 AM
For testing, create without secret scope. It will be unsafe, but you can post secrets as strings in the notebook for testing. Here is the code which I used for loading data from confluent:
inputDF = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
.option("subscribe", topic)
.option("kafka.client.id", "Databricks")
.option("kafka.group.id", "new_group2")
.option("spark.streaming.kafka.maxRatePerPartition", "5")
.option("startingOffsets", "earliest")
.option("kafka.session.timeout.ms", "10000")
.option("minPartitions", sc.DefaultParallelism)
.load() )
01-06-2023 07:49 AM
Hey Hubert,
Firstly thanks for your response. Sure, I'll try this instead.
For line 11, kafka.group.id , that'll have to be from confluent cloud right?
Thanks,
Julie
01-06-2023 07:53 AM
It is the name defined by you to control offset. I think that once you put in databricks code it will create it in kafka/confluent automatically.
01-06-2023 09:19 AM
01-06-2023 09:24 AM
or does this even mean its able to access my topic?
Thanks for your time Hubert. I really appreciate it.
Julie
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group