cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

SDP continuous mode

lachu
New Contributor

Hi,

I was doing a POC and hence used open source spark and kafka in docket container and got it working. The sample code is ingesting data from kafka but it is running only in batch mode. Not able to continuously ingest the kafka stream

Question: Can we create streaming  continuous pipeline using open source spark?

 

Thanks

4 REPLIES 4

bala_sai
New Contributor

Yes, we can build a continuous streaming pipeline using open source Spark. The main thing is to use Spark Structured Streaming, not a normal batch read. For Kafka streaming, we need to use spark.readStream, then write using writeStream, and keep the query running with awaitTermination().

Sample code (python):

# Read continuously from Kafka
kafka_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "your_topic_name")
.option("startingOffsets", "latest")
.load()
)

# Convert Kafka key/value from binary to string
parsed_df = (
kafka_df
.select(
col("key").cast("string").alias("key"),
col("value").cast("string").alias("value"),
col("timestamp")
)
)

# Write stream output
query = (
parsed_df.writeStream
.format("delta") # can be changed to parquet/delta etc.
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoints/kafka_stream_poc")
.start()
)

# Keeps the streaming job alive
query.awaitTermination()

lachu
New Contributor

Hmm. This looks more like imperative programming than sdp

 

Would you be able to give me a sample with @DP.table?

Yogasathyandrun
New Contributor

 

 

Yes, you can build a continuously running streaming pipeline on open-source Spark 

Structured Streaming (imperative)

For Kafka, the standard approach is readStream โ†’ writeStream, kept alive with awaitTermination():

kafka_df = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "kafka:9092")
         .option("subscribe", "your_topic_name")
         .load()
)

query = (
    kafka_df.writeStream
            .format("delta")                              # or "console" for a quick POC check
            .outputMode("append")
            .option("checkpointLocation", "/tmp/checkpoints/kafka_stream")
            .option("path", "/tmp/output/kafka_stream")   # required for file sinks
            .start()
)
query.awaitTermination()

A POC usually behaves like a batch job for one of two reasons: it uses spark.read.format("kafka") (batch) instead of spark.readStream.format("kafka"), or it has a one-time trigger like .trigger(availableNow=True) (the newer form of .trigger(once=True)), which drains the topic once and stops. With readStream + writeStream + awaitTermination() and no one-time trigger, Spark keeps consuming new messages as they arrive.

Spark Declarative Pipelines (@dp.table)

The declarative equivalent โ€” you define the datasets and let the framework handle execution, checkpointing, and orchestration:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

# Bronze - raw Kafka stream
@dp.table(name="kafka_events")
def kafka_events():
    return (
        spark.readStream
             .format("kafka")
             .option("kafka.bootstrap.servers", "kafka:9092")
             .option("subscribe", "your_topic_name")
             .load()
             .select(
                 col("key").cast("string").alias("key"),
                 col("value").cast("string").alias("value"),
                 col("timestamp")
             )
    )

# Silver - simple transformation
@dp.table(name="events_clean")
def events_clean():
    return (
        spark.readStream.table("kafka_events")
             .filter(col("value").isNotNull())
    )

Note there's no writeStream.start() / awaitTermination() inside the definitions โ€” the framework manages that.

Running an SDP pipeline with spark-pipelines run performs a triggered/incremental update in open-source Spark 4.1. The always-on continuous mode isn't in the open-source framework yet โ€” it's a Databricks Lakeflow extension. So dp.table gives you the declarative model, but for a process that runs forever, the Structured Streaming version above is still the route.

If you can share the Kafka ingestion code you're currently using, and whether you're targeting Structured Streaming or the pyspark.pipelines SDP framework, it'd be easier to pinpoint why the current run behaves like a batch job.

Data Engineer | Apache Spark | Delta Lake | Databricks

lachu
New Contributor

Sample code that i used

from pyspark import pipelines as dp
from pyspark.sql import DataFrame, SparkSession, functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType

spark = SparkSession.active()

@dp.temporary_view()
def event_stream() -> DataFrame:
    """Stream order events from Kafka."""
    schema = StructType([       
        StructField("event_id", IntegerType()),
        StructField("timestamp", StringType()),
        StructField("user_id", StringType()),
        StructField("event_type", StringType()),
        StructField("product", StringType()),
        StructField("price", DecimalType(8,2)),
        StructField("quantity", IntegerType()),
    ])

    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:29092")
        .option("subscribe", "events")
        .option("startingOffsets", "earliest") 
        .load()
        .select(f.from_json(f.col("value").cast("string"), schema).alias("data"))
        .select("data.*")
    )

# The @DP.table decorator combined with spark.readStream marks this as a streaming table
	(format='delta')
def bronze_events() -> DataFrame:
    """Persist the stream into a Delta Bronze layer continuously."""
    # Correct open-source SDP syntax to read an upstream view as a stream
    return spark.readStream.table('event_stream')

Sample config 

name: StreamTest_001
storage: "file:////opt/spark/data/checkpoints/StreamTest_001"
catalog: spark_catalog
database: default

libraries:
  - glob:
      include: "./stream.py"

configuration:
  spark.remote: "sc://localhost:15002"
  spark.sql.shuffle.partitions: "2"
  pipelines.trigger.type: "ProcessingTime"
  pipelines.trigger.interval: "10 seconds"