<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: SDP continuous mode in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160239#M54872</link>
    <description>&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Yes, you can build a continuously running streaming pipeline on open-source Spark&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Structured Streaming (imperative)&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;For Kafka, the standard approach is &lt;STRONG&gt;readStream&lt;/STRONG&gt; → &lt;STRONG&gt;writeStream&lt;/STRONG&gt;, kept alive with &lt;STRONG&gt;awaitTermination():&lt;/STRONG&gt;&lt;/P&gt;&lt;PRE&gt;kafka_df = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "kafka:9092")
         .option("subscribe", "your_topic_name")
         .load()
)

query = (
    kafka_df.writeStream
            .format("delta")                              # or "console" for a quick POC check
            .outputMode("append")
            .option("checkpointLocation", "/tmp/checkpoints/kafka_stream")
            .option("path", "/tmp/output/kafka_stream")   # required for file sinks
            .start()
)
query.awaitTermination()&lt;/PRE&gt;&lt;P&gt;A POC usually behaves like a batch job for one of two reasons: it uses &lt;STRONG&gt;spark.read.format("kafka")&lt;/STRONG&gt; (batch) instead of &lt;STRONG&gt;spark.readStream.format("kafka")&lt;/STRONG&gt;, or it has a one-time trigger like &lt;STRONG&gt;.trigger(availableNow=True)&lt;/STRONG&gt; (the newer form of &lt;STRONG&gt;.trigger(once=True)),&lt;/STRONG&gt; which drains the topic once and stops. With &lt;STRONG&gt;readStream + writeStream + awaitTermination()&lt;/STRONG&gt; and no one-time trigger, Spark keeps consuming new messages as they arrive.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Spark Declarative Pipelines (@dp.table)&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;The declarative equivalent — you define the datasets and let the framework handle execution, checkpointing, and orchestration:&lt;/P&gt;&lt;PRE&gt;from pyspark import pipelines as dp
from pyspark.sql.functions import col

# Bronze - raw Kafka stream
@dp.table(name="kafka_events")
def kafka_events():
    return (
        spark.readStream
             .format("kafka")
             .option("kafka.bootstrap.servers", "kafka:9092")
             .option("subscribe", "your_topic_name")
             .load()
             .select(
                 col("key").cast("string").alias("key"),
                 col("value").cast("string").alias("value"),
                 col("timestamp")
             )
    )

# Silver - simple transformation
@dp.table(name="events_clean")
def events_clean():
    return (
        spark.readStream.table("kafka_events")
             .filter(col("value").isNotNull())
    )&lt;/PRE&gt;&lt;P&gt;Note there's no writeStream.start() / awaitTermination() inside the definitions — the framework manages that.&lt;/P&gt;&lt;P&gt;Running an SDP pipeline with spark-pipelines run performs a triggered/incremental update in open-source Spark 4.1. The always-on continuous mode isn't in the open-source framework yet — it's a Databricks Lakeflow extension. So &lt;STRONG&gt;dp.table&lt;/STRONG&gt; gives you the declarative model, but for a process that runs forever, the Structured Streaming version above is still the route.&lt;/P&gt;&lt;P&gt;If you can share the Kafka ingestion code you're currently using, and whether you're targeting Structured Streaming or the pyspark.pipelines SDP framework, it'd be easier to pinpoint why the current run behaves like a batch job.&lt;/P&gt;</description>
    <pubDate>Tue, 23 Jun 2026 11:22:35 GMT</pubDate>
    <dc:creator>Yogasathyandrun</dc:creator>
    <dc:date>2026-06-23T11:22:35Z</dc:date>
    <item>
      <title>SDP continuous mode</title>
      <link>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160156#M54861</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I was doing a POC and hence used open source spark and kafka in docket container and got it working. The sample code is ingesting data from kafka but it is running only in batch mode. Not able to continuously ingest the kafka stream&lt;/P&gt;&lt;P&gt;Question: Can we create streaming&amp;nbsp;&amp;nbsp;continuous pipeline using open source spark?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;</description>
      <pubDate>Mon, 22 Jun 2026 20:21:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160156#M54861</guid>
      <dc:creator>lachu</dc:creator>
      <dc:date>2026-06-22T20:21:42Z</dc:date>
    </item>
    <item>
      <title>Re: SDP continuous mode</title>
      <link>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160168#M54862</link>
      <description>&lt;P&gt;Yes, we can build a continuous streaming pipeline using open source Spark. The main thing is to use Spark Structured Streaming, not a normal batch read. For Kafka streaming, we need to use spark.readStream, then write using writeStream, and keep the query running with awaitTermination().&lt;/P&gt;&lt;P&gt;Sample code (python):&lt;/P&gt;&lt;P&gt;# Read continuously from Kafka&lt;BR /&gt;kafka_df = (&lt;BR /&gt;spark.readStream&lt;BR /&gt;.format("kafka")&lt;BR /&gt;.option("kafka.bootstrap.servers", "kafka:9092")&lt;BR /&gt;.option("subscribe", "your_topic_name")&lt;BR /&gt;.option("startingOffsets", "latest")&lt;BR /&gt;.load()&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;# Convert Kafka key/value from binary to string&lt;BR /&gt;parsed_df = (&lt;BR /&gt;kafka_df&lt;BR /&gt;.select(&lt;BR /&gt;col("key").cast("string").alias("key"),&lt;BR /&gt;col("value").cast("string").alias("value"),&lt;BR /&gt;col("timestamp")&lt;BR /&gt;)&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;# Write stream output&lt;BR /&gt;query = (&lt;BR /&gt;parsed_df.writeStream&lt;BR /&gt;.format("delta") # can be changed to parquet/delta etc.&lt;BR /&gt;.outputMode("append")&lt;BR /&gt;.option("checkpointLocation", "/tmp/checkpoints/kafka_stream_poc")&lt;BR /&gt;.start()&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;# Keeps the streaming job alive&lt;BR /&gt;query.awaitTermination()&lt;/P&gt;</description>
      <pubDate>Tue, 23 Jun 2026 02:55:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160168#M54862</guid>
      <dc:creator>bala_sai</dc:creator>
      <dc:date>2026-06-23T02:55:37Z</dc:date>
    </item>
    <item>
      <title>Re: SDP continuous mode</title>
      <link>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160175#M54865</link>
      <description>&lt;P&gt;Hmm. This looks more like imperative programming than sdp&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Would you be able to give me a sample with &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/25059"&gt;@DP&lt;/a&gt;.table?&lt;/P&gt;</description>
      <pubDate>Tue, 23 Jun 2026 04:29:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160175#M54865</guid>
      <dc:creator>lachu</dc:creator>
      <dc:date>2026-06-23T04:29:13Z</dc:date>
    </item>
    <item>
      <title>Re: SDP continuous mode</title>
      <link>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160239#M54872</link>
      <description>&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Yes, you can build a continuously running streaming pipeline on open-source Spark&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Structured Streaming (imperative)&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;For Kafka, the standard approach is &lt;STRONG&gt;readStream&lt;/STRONG&gt; → &lt;STRONG&gt;writeStream&lt;/STRONG&gt;, kept alive with &lt;STRONG&gt;awaitTermination():&lt;/STRONG&gt;&lt;/P&gt;&lt;PRE&gt;kafka_df = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "kafka:9092")
         .option("subscribe", "your_topic_name")
         .load()
)

query = (
    kafka_df.writeStream
            .format("delta")                              # or "console" for a quick POC check
            .outputMode("append")
            .option("checkpointLocation", "/tmp/checkpoints/kafka_stream")
            .option("path", "/tmp/output/kafka_stream")   # required for file sinks
            .start()
)
query.awaitTermination()&lt;/PRE&gt;&lt;P&gt;A POC usually behaves like a batch job for one of two reasons: it uses &lt;STRONG&gt;spark.read.format("kafka")&lt;/STRONG&gt; (batch) instead of &lt;STRONG&gt;spark.readStream.format("kafka")&lt;/STRONG&gt;, or it has a one-time trigger like &lt;STRONG&gt;.trigger(availableNow=True)&lt;/STRONG&gt; (the newer form of &lt;STRONG&gt;.trigger(once=True)),&lt;/STRONG&gt; which drains the topic once and stops. With &lt;STRONG&gt;readStream + writeStream + awaitTermination()&lt;/STRONG&gt; and no one-time trigger, Spark keeps consuming new messages as they arrive.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Spark Declarative Pipelines (@dp.table)&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;The declarative equivalent — you define the datasets and let the framework handle execution, checkpointing, and orchestration:&lt;/P&gt;&lt;PRE&gt;from pyspark import pipelines as dp
from pyspark.sql.functions import col

# Bronze - raw Kafka stream
@dp.table(name="kafka_events")
def kafka_events():
    return (
        spark.readStream
             .format("kafka")
             .option("kafka.bootstrap.servers", "kafka:9092")
             .option("subscribe", "your_topic_name")
             .load()
             .select(
                 col("key").cast("string").alias("key"),
                 col("value").cast("string").alias("value"),
                 col("timestamp")
             )
    )

# Silver - simple transformation
@dp.table(name="events_clean")
def events_clean():
    return (
        spark.readStream.table("kafka_events")
             .filter(col("value").isNotNull())
    )&lt;/PRE&gt;&lt;P&gt;Note there's no writeStream.start() / awaitTermination() inside the definitions — the framework manages that.&lt;/P&gt;&lt;P&gt;Running an SDP pipeline with spark-pipelines run performs a triggered/incremental update in open-source Spark 4.1. The always-on continuous mode isn't in the open-source framework yet — it's a Databricks Lakeflow extension. So &lt;STRONG&gt;dp.table&lt;/STRONG&gt; gives you the declarative model, but for a process that runs forever, the Structured Streaming version above is still the route.&lt;/P&gt;&lt;P&gt;If you can share the Kafka ingestion code you're currently using, and whether you're targeting Structured Streaming or the pyspark.pipelines SDP framework, it'd be easier to pinpoint why the current run behaves like a batch job.&lt;/P&gt;</description>
      <pubDate>Tue, 23 Jun 2026 11:22:35 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160239#M54872</guid>
      <dc:creator>Yogasathyandrun</dc:creator>
      <dc:date>2026-06-23T11:22:35Z</dc:date>
    </item>
    <item>
      <title>Re: SDP continuous mode</title>
      <link>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160276#M54874</link>
      <description>&lt;P&gt;Sample code that i used&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;from pyspark import pipelines as dp
from pyspark.sql import DataFrame, SparkSession, functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType

spark = SparkSession.active()

@dp.temporary_view()
def event_stream() -&amp;gt; DataFrame:
    """Stream order events from Kafka."""
    schema = StructType([       
        StructField("event_id", IntegerType()),
        StructField("timestamp", StringType()),
        StructField("user_id", StringType()),
        StructField("event_type", StringType()),
        StructField("product", StringType()),
        StructField("price", DecimalType(8,2)),
        StructField("quantity", IntegerType()),
    ])

    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:29092")
        .option("subscribe", "events")
        .option("startingOffsets", "earliest") 
        .load()
        .select(f.from_json(f.col("value").cast("string"), schema).alias("data"))
        .select("data.*")
    )

# The &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/25059"&gt;@DP&lt;/a&gt;.table decorator combined with spark.readStream marks this as a streaming table
	(format='delta')
def bronze_events() -&amp;gt; DataFrame:
    """Persist the stream into a Delta Bronze layer continuously."""
    # Correct open-source SDP syntax to read an upstream view as a stream
    return spark.readStream.table('event_stream')&lt;/LI-CODE&gt;&lt;P&gt;Sample config&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;name: StreamTest_001
storage: "file:////opt/spark/data/checkpoints/StreamTest_001"
catalog: spark_catalog
database: default

libraries:
  - glob:
      include: "./stream.py"

configuration:
  spark.remote: "sc://localhost:15002"
  spark.sql.shuffle.partitions: "2"
  pipelines.trigger.type: "ProcessingTime"
  pipelines.trigger.interval: "10 seconds"&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 23 Jun 2026 15:07:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sdp-continuous-mode/m-p/160276#M54874</guid>
      <dc:creator>lachu</dc:creator>
      <dc:date>2026-06-23T15:07:25Z</dc:date>
    </item>
  </channel>
</rss>

