cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

SDP continuous mode

lachu
Visitor

Hi,

I was doing a POC and hence used open source spark and kafka in docket container and got it working. The sample code is ingesting data from kafka but it is running only in batch mode. Not able to continuously ingest the kafka stream

Question: Can we create streaming  continuous pipeline using open source spark?

 

Thanks

2 REPLIES 2

bala_sai
New Contributor

Yes, we can build a continuous streaming pipeline using open source Spark. The main thing is to use Spark Structured Streaming, not a normal batch read. For Kafka streaming, we need to use spark.readStream, then write using writeStream, and keep the query running with awaitTermination().

Sample code (python):

# Read continuously from Kafka
kafka_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "your_topic_name")
.option("startingOffsets", "latest")
.load()
)

# Convert Kafka key/value from binary to string
parsed_df = (
kafka_df
.select(
col("key").cast("string").alias("key"),
col("value").cast("string").alias("value"),
col("timestamp")
)
)

# Write stream output
query = (
parsed_df.writeStream
.format("delta") # can be changed to parquet/delta etc.
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoints/kafka_stream_poc")
.start()
)

# Keeps the streaming job alive
query.awaitTermination()

Hmm. This looks more like imperative programming than sdp

 

Would you be able to give me a sample with @DP.table?