cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Connecting confluent to databricks.

Mbinyala
New Contributor II

Hi!!

Can someone tell me how to connect the confluent cloud to Databricks? I am new to this so please elaborate on your answer.

2 REPLIES 2

Anonymous
Not applicable

Here's a step-by-step guide to connecting Confluent Cloud to Databricks:

Step 1: Set up a Confluent Cloud Cluster

  • Sign up for a Confluent Cloud account at https://confluent.cloud/ and create a new cluster if you haven't already.
  • Once your cluster is ready, note down the following information:
    • Bootstrap Servers (e.g., kafka-brokers.example.com:9092)
    • API Key and Secret (for authentication)

      Step 2: Configure Databricks

      • In your Databricks workspace, create a new notebook or open an existing one.
      • Set up the necessary configuration using the Databricks secret scope feature. Secrets allow you to securely store sensitive information like credentials. To create a secret scope and add secrets, follow these steps:
        • Open the notebook and run the following command to create a secret
        • scope:dbutils.secrets.createScope(scopeName)

          Configure the Confluent Cloud secrets by adding the bootstrap servers, API Key, and Secret to the created secret scope:

          dbutils.secrets.put(scope = scopeName, key = "kafka.bootstrap.servers", value = "kafka-brokers.example.com:9092")
          dbutils.secrets.put(scope = scopeName, key = "kafka.security.protocol", value = "SASL_SSL")
          dbutils.secrets.put(scope = scopeName, key = "kafka.sasl.mechanism", value = "PLAIN")
          dbutils.secrets.put(scope = scopeName, key = "kafka.sasl.jaas.config", value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<API_KEY>\" password=\"<API_SECRET>\";")

          Step 3: Create a Streaming DataFrame in Databricks

          • In the same Databricks notebook, you can now create a Streaming DataFrame to consume data from Confluent Cloud. Here's an example code snippet:
            python
            from pyspark.sql.functions import from_json, col
            from pyspark.sql.types import StructType, StringType, DoubleType
            
            # Define the schema of the incoming data
            schema = StructType().add("name", StringType()).add("age", DoubleType())
            
            # Read data from Kafka topic
            kafka_bootstrap_servers = dbutils.secrets.get(scope = scopeName, key = "kafka.bootstrap.servers")
            df = spark \
              .readStream \
              .format("kafka") \
              .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
              .option("subscribe", "topic-name") \
              .option("startingOffsets", "earliest") \
              .load()
            
            # Extract and process the data
            processed_df = df \
              .select(from_json(col("value").cast("string"), schema).alias("data")) \
              .select("data.name", "data.age")
            
            # Start the streaming query
            query = processed_df.writeStream \
              .outputMode("append") \
              .format("console") \
              .start()
            
            query.awaitTermination()

            Step 4: Customize the code as per your requirements

            • Modify the code snippet above to suit your specific use case. Update the schema definition, Kafka topic name, and any data transformations or output sinks as needed

             

           

VaibB
Contributor