cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Scope creation in Databricks or Confluent?

julie
New Contributor III

Hello I am a newbie in this field and trying to access confluent kafka stream in Databricks Azure based on a beginner's video by Databricks. I have a free trial of Databricks cluster right now. When I run the below notebook, it errors out on line 5 on scope.

My question is, should I create the scope in confluent or in Databricks. Has anybody seen this before?

youtube video

image 

Thanks so much!!

Julie

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

For testing, create without secret scope. It will be unsafe, but you can post secrets as strings in the notebook for testing. Here is the code which I used for loading data from confluent:

inputDF = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
.option("subscribe", topic)
.option("kafka.client.id", "Databricks")
.option("kafka.group.id", "new_group2")
.option("spark.streaming.kafka.maxRatePerPartition", "5")
.option("startingOffsets", "earliest")
.option("kafka.session.timeout.ms", "10000")
 .option("minPartitions", sc.DefaultParallelism)
.load() )

View solution in original post

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

For testing, create without secret scope. It will be unsafe, but you can post secrets as strings in the notebook for testing. Here is the code which I used for loading data from confluent:

inputDF = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", host)
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(userid, password))
.option("subscribe", topic)
.option("kafka.client.id", "Databricks")
.option("kafka.group.id", "new_group2")
.option("spark.streaming.kafka.maxRatePerPartition", "5")
.option("startingOffsets", "earliest")
.option("kafka.session.timeout.ms", "10000")
 .option("minPartitions", sc.DefaultParallelism)
.load() )

julie
New Contributor III

Hey Hubert,

Firstly thanks for your response. Sure, I'll try this instead.

For line 11, kafka.group.id , that'll have to be from confluent cloud right?

Thanks,

Julie

Hubert-Dudek
Esteemed Contributor III

It is the name defined by you to control offset. I think that once you put in databricks code it will create it in kafka/confluent automatically.

julie
New Contributor III

Thanks Hubert, it runs fine and it looks like it is creating an input dataframe. How do I see if it consuming the correct topic? any thoughts? image

julie
New Contributor III

or does this even mean its able to access my topic?

Thanks for your time Hubert. I really appreciate it.

Julie

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group