SDP continuous mode
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Monday
Hi,
I was doing a POC and hence used open source spark and kafka in docket container and got it working. The sample code is ingesting data from kafka but it is running only in batch mode. Not able to continuously ingest the kafka stream
Question: Can we create streaming continuous pipeline using open source spark?
Thanks
- Labels:
-
Spark
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Monday
Yes, we can build a continuous streaming pipeline using open source Spark. The main thing is to use Spark Structured Streaming, not a normal batch read. For Kafka streaming, we need to use spark.readStream, then write using writeStream, and keep the query running with awaitTermination().
Sample code (python):
# Read continuously from Kafka
kafka_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "your_topic_name")
.option("startingOffsets", "latest")
.load()
)
# Convert Kafka key/value from binary to string
parsed_df = (
kafka_df
.select(
col("key").cast("string").alias("key"),
col("value").cast("string").alias("value"),
col("timestamp")
)
)
# Write stream output
query = (
parsed_df.writeStream
.format("delta") # can be changed to parquet/delta etc.
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoints/kafka_stream_poc")
.start()
)
# Keeps the streaming job alive
query.awaitTermination()
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Monday
Hmm. This looks more like imperative programming than sdp
Would you be able to give me a sample with @DP.table?