cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Connecting confluent to databricks.

Mbinyala
New Contributor II

Hi!!

Can someone tell me how to connect the confluent cloud to Databricks? I am new to this so please elaborate on your answer.

2 REPLIES 2

Anonymous
Not applicable

Here's a step-by-step guide to connecting Confluent Cloud to Databricks:

Step 1: Set up a Confluent Cloud Cluster

  • Sign up for a Confluent Cloud account at https://confluent.cloud/ and create a new cluster if you haven't already.
  • Once your cluster is ready, note down the following information:
    • Bootstrap Servers (e.g., kafka-brokers.example.com:9092)
    • API Key and Secret (for authentication)

      Step 2: Configure Databricks

      • In your Databricks workspace, create a new notebook or open an existing one.
      • Set up the necessary configuration using the Databricks secret scope feature. Secrets allow you to securely store sensitive information like credentials. To create a secret scope and add secrets, follow these steps:
        • Open the notebook and run the following command to create a secret
        • scope:dbutils.secrets.createScope(scopeName)

          Configure the Confluent Cloud secrets by adding the bootstrap servers, API Key, and Secret to the created secret scope:

          dbutils.secrets.put(scope = scopeName, key = "kafka.bootstrap.servers", value = "kafka-brokers.example.com:9092")
          dbutils.secrets.put(scope = scopeName, key = "kafka.security.protocol", value = "SASL_SSL")
          dbutils.secrets.put(scope = scopeName, key = "kafka.sasl.mechanism", value = "PLAIN")
          dbutils.secrets.put(scope = scopeName, key = "kafka.sasl.jaas.config", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<API_KEY>\" password=\"<API_SECRET>\";")

          Step 3: Create a Streaming DataFrame in Databricks

          • In the same Databricks notebook, you can now create a Streaming DataFrame to consume data from Confluent Cloud. Here's an example code snippet:
            python
            from pyspark.sql.functions import from_json, col
            from pyspark.sql.types import StructType, StringType, DoubleType
            
            # Define the schema of the incoming data
            schema = StructType().add("name", StringType()).add("age", DoubleType())
            
            # Read data from Kafka topic
            kafka_bootstrap_servers = dbutils.secrets.get(scope = scopeName, key = "kafka.bootstrap.servers")
            df = spark \
              .readStream \
              .format("kafka") \
              .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
              .option("subscribe", "topic-name") \
              .option("startingOffsets", "earliest") \
              .load()
            
            # Extract and process the data
            processed_df = df \
              .select(from_json(col("value").cast("string"), schema).alias("data")) \
              .select("data.name", "data.age")
            
            # Start the streaming query
            query = processed_df.writeStream \
              .outputMode("append") \
              .format("console") \
              .start()
            
            query.awaitTermination()

            Step 4: Customize the code as per your requirements

            • Modify the code snippet above to suit your specific use case. Update the schema definition, Kafka topic name, and any data transformations or output sinks as needed

             

           

VaibB
Contributor
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.