Delta Live tables stream output to Kafka
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-08-2024 01:29 PM
Hello,
Wanted to Know if we can write the stream output to a Kafka topic in DLT pipeline?
Please let me know.
Thankyou.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-08-2024 04:03 PM
Hi,
Yes, you can write the stream output to a Kafka topic in a Databricks Delta Live Tables (DLT) pipeline. Here’s how you can do it:
- Set Up Kafka Configuration: Ensure you have the necessary Kafka configurations such as Kafka broker URL, topic name, and security settings (if any).
- Create a DLT Pipeline: Set up a Delta Live Table pipeline in Databricks.
- Define the Stream: In your DLT pipeline, define the streaming source. For example, this could be a stream from a file, a Delta table, or another streaming source.
- Write to Kafka: Use the writeStream method with Kafka options to send the stream to a Kafka topic.
Here’s a basic example in PySpark:
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-18-2024 09:17 AM
Is it possible to have 2 notebooks in a DLT pipeline, with the first notebook reading from topic1 in Kafka and writing to a DLT and the second notebook reading from this DLT, applying some data transformations and write streaming to a topic2 in Kafka? All in streaming mode?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-09-2024 09:28 AM - edited 07-09-2024 09:42 AM
Any sample code snippet for connecting to ScramLoginModule
I'm using below code to push the data to kafka topic and getting error saying
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker details") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';") \
.option("topic", "topic1") \
.option("checkpointLocation", "checkpointname") \
.option("kafka.metadata.max.age.ms", "120000") \
.start()
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-10-2024 04:04 AM - edited 07-10-2024 04:19 AM
Hi !
Ensure your code is set up to use these libraries. Here is the complete example:
- Navigate to your cluster configuration:
- Go to your Databricks workspace.
- Click on "Clusters" and select your cluster.
- Go to the "Libraries" tab.
- Install the necessary Maven libraries:
- Click on "Install New".
- Choose "Maven" as the library source.
- Add the following Maven coordinates:
- org.apache.kafka:kafka-clients:2.8.0
- org.apache.kafka:kafka_2.13:2.8.0
df1 = df.selectExpr("CAST(null AS STRING) as key", "to_json(struct(*)) AS value")
df1.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "your_broker_details") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='your_username' password='your_password';") \
.option("topic", "your_topic1") \
.option("checkpointLocation", "/path/to/your/checkpoint") \
.option("kafka.metadata.max.age.ms", "120000") \
.start()
Mehdi TAJMOUATI

