If this is the first time you’re hearing about Sinks in Lakeflow Declarative Pipelines (LDP), we highly recommend reading Introducing the SDP Sink API, which explores the recently launched Sinks API, offering the ability to seamlessly sink data to external systems, such as event streams and Delta tables.
LDP provides a declarative framework designed to simplify and streamline pipeline development, whether you're processing real-time data with Streaming Tables or aggregating data efficiently using Materialized Views. The introduction of the Sinks API further enhances functionality by enabling seamless integration with external systems.
This blog delves into the support for foreachBatch in LDP, a feature from Apache Spark Structured Streaming that facilitates arbitrary batch operations on streaming data, enabling complex transformations and writes to various data sinks.
For simplicity and reproducibility, this blog post uses the same data source as the Sinks API blog: the clickstream dataset from Databricks Datasets. You can think of this data as a map of the internet, illustrating where visitors navigate and how they move from one page to another. To keep things straightforward, we'll assume a one-to-one relationship between page IDs and titles, with titles remaining static. Using this dataset, we'll explore three key use cases:
By the end of this post, you’ll see how LDP can elevate your data processing workflow. We’ll walk through how to efficiently handle streaming data in real time, seamlessly merge it with existing Delta tables, and dynamically route it to multiple destinations!
We’ll start by importing the necessary modules and creating a Streaming Table to ingest Wikipedia clickstream data in JSON format. Using the @DP.table API, we’ll stream data directly from the source location. You can also configure the pipeline to stream data from any source supported by Autoloader, including event streams. It’s as simple as switching the format and providing the necessary authorization.
from pyspark import pipelines as dp
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dp.table(
comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
With our clickstream data now streaming into the raw table, the next step is to refine it by applying data quality constraints and transformations. This includes standardizing data types and filtering out invalid records. We achieve this by leveraging LDP’s native expectations to enforce data quality and maintain pipeline integrity.
@DP.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dp.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dp.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
dp.readStream("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
With our cleaned data in a Streaming Table, we can now implement our three use cases using @DP.foreach_batch_sink, LDP's new sink that brings foreachBatch-style processing into LDP. This allows streaming data to be handled as microbatches while integrating with regular DataFrame APIs.
In Spark Structured Streaming, data is processed as a series of micro-batches. Each micro-batch contains a specific set of rows, represented as a DataFrame. During processing, this DataFrame, along with the corresponding batchId, is passed to the function registered with the sink. We will use the DataFrame df, which contains the rows from a micro-batch, to implement our use cases.
In the first use case, we aggregate total page visits at the sink across all origins. Since each micro-batch may contain multiple rows for the same current_page_id, we first aggregate the data at the micro-batch level. If an existing value is present in the sink, we update it accordingly; otherwise, we insert a new record.
The second use case filters the click_count to identify high-traffic pages within each time interval and appends this data to a table called dbo.high_traffic_pages in a SQL Server database for consumption by external systems. We are leveraging Databricks Secrets to store the connection string and utilize that for authentication with SQL Server.
For the final use case, we’ll apply a simple filter on the DataFrame to capture page visits to New York City and securely archive this data as parquet files to a volume in Unity Catalog. Access to the archived data can be managed and controlled directly through the Unity Catalog.
@DP.foreach_batch_sink(name = "all_the_sinks")
def foreachBatchFunc(df, batchId):
#Usecase 1: Aggregate page level click count at a Delta Sink
agg_df = df.groupBy("current_page_id", "current_page_title").agg(sum("click_count").alias("click_count"))
out = DeltaTable.forName(df.sparkSession, "harsha_pasala.default.clickstream_sink")
out.alias("target") \
.merge(agg_df.alias("source"), "source.current_page_id = target.current_page_id") \
.whenMatchedUpdate(
set = {"click_count": col("target.click_count") + col("source.click_count")}
) \
.whenNotMatchedInsert(
values = {
"target.current_page_id": "source.current_page_id",
"target.current_page_title": "source.current_page_title",
"target.click_count": "source.click_count"
}
) \
.execute()
#Usecase 2: High traffic pages to SQL Server.
high_traffic_pages = df.filter(col("click_count") > 1000000)
high_traffic_pages.write \
.format("jdbc") \
.option("url", dbutils.secrets.get(scope="secret-lab", key="sql_connection_string")) \
.option("dbtable", "dbo.high_traffic_pages") \
.mode("append") \
.save()
#Usecase 3: Monitor for a specific page and archive data.
new_york_clicks = df.filter(col("current_page_title").like("New_York_City"))
new_york_clicks.write.format("parquet").mode("append").save("/Volumes/path/")
Our data is ready, and the sink is in place - it is time to put it into action. Since our sinks modify data using the merge behaviour, we’ll use the update_flow API to direct data from the clickstream_clean Streaming Table to the foreach_batch_sink we created earlier, which is all_the_sinks. This ensures that data is efficiently processed and routed according to our defined use cases.
@DP.update_flow(
target="all_the_sinks",
name="sink_flow"
)
def read_data():
return (
dp.readStream("clickstream_clean")
)
Finally, by integrating clickstream data ingestion, processing with quality constraints, and multiple sinks within a LDP pipeline, we arrive at the following DAG from the pipeline run. While the DAG displays a single sink named all_the_sinks, it's important to note that this represents a foreach_batch_sink, that effectively dispatches data to all three configured downstream targets.
Over the past year, LDP has introduced numerous enhancements, with @DP.foreach_batch_sink being one of the latest additions. This feature enables advanced streaming capabilities while building on existing sinks and append flow APIs, allowing for a cleaner and more streamlined implementation. For more information about the API and sample implementations, please refer to the docs.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.