cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta Live tables stream output to Kafka

SharathE
New Contributor III

Hello,

Wanted to Know if we can write the stream output to a Kafka topic  in DLT pipeline?

Please let me know.

Thankyou.

4 REPLIES 4

mtajmouati
Contributor

Hi,

Yes, you can write the stream output to a Kafka topic in a Databricks Delta Live Tables (DLT) pipeline. Hereโ€™s how you can do it:

  • Set Up Kafka Configuration: Ensure you have the necessary Kafka configurations such as Kafka broker URL, topic name, and security settings (if any).
  • Create a DLT Pipeline: Set up a Delta Live Table pipeline in Databricks.
  • Define the Stream: In your DLT pipeline, define the streaming source. For example, this could be a stream from a file, a Delta table, or another streaming source.
  • Write to Kafka: Use the writeStream method with Kafka options to send the stream to a Kafka topic.

Hereโ€™s a basic example in PySpark:

carbon.png

โ€ƒ

 

koarjun
New Contributor II

Is it possible to have 2 notebooks in a DLT pipeline, with the first notebook reading from topic1 in Kafka and writing to a DLT and the second notebook reading from this DLT, applying some data transformations and write streaming to a topic2 in Kafka? All in streaming mode?

SharathE
New Contributor III

Any sample code snippet for connecting to ScramLoginModule

I'm using below code to push the data to kafka topic and getting error saying 

Job aborted due to stage failure: Task 15 in stage 879.0 failed 4 times, most recent failure: Lost task 15.3 in stage 879.0 (TID 9663) (executor 0): kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka producer at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:465) at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:290) at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:273)
 
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.scram.ScramLoginModule
 
code:
df1 = df.selectExpr("CAST(null AS STRING) as key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker details") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';") \
.option("topic", "topic1") \
.option("checkpointLocation", "checkpointname") \
.option("kafka.metadata.max.age.ms", "120000") \
.start()

mtajmouati
Contributor

Hi ! 

Ensure your code is set up to use these libraries. Here is the complete example:

  •   Navigate to your cluster configuration:
  • Go to your Databricks workspace.
  • Click on "Clusters" and select your cluster.
  • Go to the "Libraries" tab.
  •   Install the necessary Maven libraries:
  • Click on "Install New".
  • Choose "Maven" as the library source.
  • Add the following Maven coordinates:
    • org.apache.kafka:kafka-clients:2.8.0
    • org.apache.kafka:kafka_2.13:2.8.0

 

 

 

 

df1 = df.selectExpr("CAST(null AS STRING) as key", "to_json(struct(*)) AS value")

df1.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "your_broker_details") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='your_username' password='your_password';") \
    .option("topic", "your_topic1") \
    .option("checkpointLocation", "/path/to/your/checkpoint") \
    .option("kafka.metadata.max.age.ms", "120000") \
    .start()

 

 

 Mehdi TAJMOUATI

WyTaSoft

medium

linkedin

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group