lachu
New Contributor

Sample code that i used

from pyspark import pipelines as dp
from pyspark.sql import DataFrame, SparkSession, functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType

spark = SparkSession.active()

@dp.temporary_view()
def event_stream() -> DataFrame:
    """Stream order events from Kafka."""
    schema = StructType([       
        StructField("event_id", IntegerType()),
        StructField("timestamp", StringType()),
        StructField("user_id", StringType()),
        StructField("event_type", StringType()),
        StructField("product", StringType()),
        StructField("price", DecimalType(8,2)),
        StructField("quantity", IntegerType()),
    ])

    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:29092")
        .option("subscribe", "events")
        .option("startingOffsets", "earliest") 
        .load()
        .select(f.from_json(f.col("value").cast("string"), schema).alias("data"))
        .select("data.*")
    )

# The @DP.table decorator combined with spark.readStream marks this as a streaming table
	(format='delta')
def bronze_events() -> DataFrame:
    """Persist the stream into a Delta Bronze layer continuously."""
    # Correct open-source SDP syntax to read an upstream view as a stream
    return spark.readStream.table('event_stream')

Sample config 

name: StreamTest_001
storage: "file:////opt/spark/data/checkpoints/StreamTest_001"
catalog: spark_catalog
database: default

libraries:
  - glob:
      include: "./stream.py"

configuration:
  spark.remote: "sc://localhost:15002"
  spark.sql.shuffle.partitions: "2"
  pipelines.trigger.type: "ProcessingTime"
  pipelines.trigger.interval: "10 seconds"