Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
yesterday
Sample code that i used
from pyspark import pipelines as dp
from pyspark.sql import DataFrame, SparkSession, functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType
spark = SparkSession.active()
@dp.temporary_view()
def event_stream() -> DataFrame:
"""Stream order events from Kafka."""
schema = StructType([
StructField("event_id", IntegerType()),
StructField("timestamp", StringType()),
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("product", StringType()),
StructField("price", DecimalType(8,2)),
StructField("quantity", IntegerType()),
])
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:29092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
.select(f.from_json(f.col("value").cast("string"), schema).alias("data"))
.select("data.*")
)
# The @DP.table decorator combined with spark.readStream marks this as a streaming table
(format='delta')
def bronze_events() -> DataFrame:
"""Persist the stream into a Delta Bronze layer continuously."""
# Correct open-source SDP syntax to read an upstream view as a stream
return spark.readStream.table('event_stream')Sample config
name: StreamTest_001
storage: "file:////opt/spark/data/checkpoints/StreamTest_001"
catalog: spark_catalog
database: default
libraries:
- glob:
include: "./stream.py"
configuration:
spark.remote: "sc://localhost:15002"
spark.sql.shuffle.partitions: "2"
pipelines.trigger.type: "ProcessingTime"
pipelines.trigger.interval: "10 seconds"