cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
aleksandra_ch
Databricks Employee
Databricks Employee

Spark Declarative Pipelines “How-To” Series

Introduction

Lakeflow Spark Declarative Pipelines (SDP) is a framework designed for building scalable, maintainable ETL pipelines. Everyone should want to make the transition from traditional imperative ETL—like standard Spark or Pandas—to a declarative approach because it saves time, effort and removes headache. This series of blog posts is going to make it easy by providing the solutions and best practices you need to get started.

By adopting a declarative approach, you focus exclusively on business logic. SDP automatically handles the heavy lifting—recomputing, retries, persistence, and state management—eliminating the redundant boilerplate code. 

Let’s walk through an example.

Consider a common task: ingesting Change Data Capture (CDC) logs to maintain a Slowly Changing Dimension (SCD) Type 1 table. You need to upsert records based on a key, handle deletes, and ensure only the latest version of a record is kept.

 

Spark Imperative code

Spark Declarative Pipelines

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, struct, max_by

def upsert_to_delta(micro_batch_df, batch_id):
    # Deduplicate within the micro-batch to get the latest record per key
    (micro_batch_df
     .groupBy("key")
     .agg(max_by(struct("*"), col("ts")).alias("row"))
     .select("row.*")
     .createOrReplaceTempView("updates"))

    # Execute the MERGE logic
    # Note: Replace the ellipses with your actual column mapping
    micro_batch_df.spark_session.sql("""
        MERGE INTO cdc_data_raw t
        USING updates s
        ON s.key = t.key
        WHEN MATCHED AND s.is_delete = true THEN 
            UPDATE SET DELETED_AT = now()
        WHEN MATCHED THEN UPDATE SET 
            A = CASE WHEN s.ts > t.ts THEN s.a ELSE t.a END,
            B = CASE WHEN s.ts > t.ts THEN s.b ELSE t.b END,
            -- ... for every column ...
            ts = CASE WHEN s.ts > t.ts THEN s.ts ELSE t.ts END
        WHEN NOT MATCHED THEN INSERT *
    """)

# Start the stream
(cdcData.writeStream
    .foreachBatch(upsert_to_delta)
    .outputMode("append")
    .start()
)​


from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
    return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
    target = "target",
    source = "users",
    keys = ["key"],
    sequence_by = col("ts"),
    apply_as_deletes = expr("is_deleted = True")
)

In an imperative approach, this would require a complex, multi-line MERGE statement and manual state management. In SDP, the AUTO CDC flow handles the mechanics automatically. You focus only on the business logic. Bingo!

Of course, AUTO CDC is just one building block of the framework—there are a few other simple patterns to keep in mind to get the most out of your pipelines. Let’s start with the fundamentals: how to save results into a table in SDP.

 

Part 1. How to Save Results into a Table with Spark Declarative Pipelines

In the traditional Spark world, we typically end every script with a "command." We transform our data, and then we tell Spark exactly what to do with it: df.write.saveAsTable("sales").

When you first switch to Lakeflow SDP, you might get blocked. You’ve written your transformation, you're ready to persist the data, and you reach for your trusty .write.save()Except, it’s not there. You have the data, but how do you actually make it land in a table?

In fact, in SDP, explicitly calling a "write" action isn't needed.

The Shift: From Action to Declaration

The shift is simple: in an imperative world, you send data to a table. In a declarative world, you define a table as the result of a function or a SQL query (SDP supports both Python and SQL).

Instead of manually coding steps to follow, you simply use a decorator. Note that, in Python SDP, every table declaration is a Python function which returns a Spark DataFrame:

The Imperative Way The SDP Way
(spark.readStream.table("raw_sales")
    .filter("amount > 0")
    .writeStream .option("checkpointLocation","s3://bucket/checkpoints/sales_report")
    .outputMode("append")
    .toTable("sales_report")
)
from pyspark import pipelines as dp

@dp.table
def sales_report():
    return (spark.readStream
             .table("raw_sales")
             .filter("amount > 0")
            )

 

Consider a more complex example to illustrate a daily aggregation over sales data:

The Imperative Way The SDP Way

This approach forces you to step outside the main logic flow into a "helper" function, manually managing the sink and checkpoints.

# The foreachbatch function
def process_daily_chunk(batch_df, batch_id):
    # Perform aggregation within the batch.
    # Note: this would NOT handle late data.
    daily_summary = (batch_df
        .groupBy("store_id", "processing_date")
        .agg({"sales_amount": "sum"})
    )

    # Manually append to the target table
    (daily_summary.write.mode("append")
         .saveAsTable("daily_sales_stats"))

# The main stream management
(spark.readStream
    .table("raw_transactions")
    .writeStream
    .foreachBatch(process_daily_chunk)
    .option("checkpointLocation", "s3://bucket/checkpoints/daily_stats")
    .start()
)

The framework handles the "how." You only define the "what." Because you used @​dp.materialized_view, SDP knows to handle the state and incremental updates automatically.

from pyspark import pipelines as dp
from pyspark.sql import functions as F

@dp.materialized_view
def daily_sales_stats():
    # You define the transformation; 
    # SDP handles the execution under the hood.
    # Note: late data is handled automatically.
    return (
        spark.read.table("raw_transactions")
        .groupBy("store_id", "processing_date")
        .agg(F.sum("sales_amount").alias("total_sales"))
    )

By adding @​dp.table or @​dp.materialized_view, you are telling the framework: "This function represents a physical table. Make it so." Note that, by default, the name of the table is inferred from the function name.

If you’re wondering, "Wait, when do I use a @​dp.table versus a @​dp.materialized_view?"—don't worry! I've used both in the examples above, and it’s perfectly normal if the distinction feels blurry right now. They look similar, but they handle data quite differently under the hood. We will dive into the differences in the next post.

Key Takeaway

As you start your journey with SDP, perform a regular "self-check" on your code:

"Am I focusing on the business logic, or am I getting into implementation details?"

If you find yourself figuring out save modes or foreachbatch semantics, you're likely still thinking imperatively. When you let the framework handle the "how," you know you're using SDP correctly.

Conclusion

SDP provides a standardized, managed framework for ETL pipelines. It automatically handles the checkpoints, storage paths, and boilerplate code, giving more room to focus on business logic and outcomes.

In the next post, we’ll dive deep into the Differences between Streaming Tables and Materialized Views, breaking down exactly when to use each one to keep your pipelines efficient. Stay tuned!

3 Comments
Louis_Frolio
Databricks Employee
Databricks Employee

@aleksandra_ch ,

 

Great start to this “How-To” series on Spark Declarative Pipelines.

I appreciate the clear emphasis on persisting results into tables and, more importantly, the mindset shift toward declarative patterns. That transition—from imperative Spark workflows to SDP—is where many practitioners need clarity, and this lays a solid foundation.

Looking forward to the next installments.

Cheers, Louis.

antoalphi
New Contributor III

I appreciate the clear explanation and the emphasis on focusing more on business logic rather than implementation details.

Have a question regarding the behavior of @DP.table.

From my observations, @DP.table often results in a materialized view, even in cases where similar logic could be implemented using a Delta table.

I understand that Declarative Pipelines emphasize defining what the dataset should be rather than how it should be managed. However, I’m trying to better understand the design decision here.

Specifically:

  • What is the rationale behind favoring materialized views in these scenarios?
  • Under what conditions does @DP.table create a Delta table instead?
  • Are there recommended best practices for influencing or controlling this behavior when needed?

Would appreciate any insights, especially from those who have implemented this in production.

 

aleksandra_ch
Databricks Employee
Databricks Employee

Hi @antoalphi ,

You are correct when saying that Materialized View's logic can be implemented using a regular Delta table (outside of SDP). What Materialized Views bring you:

  • Automatic lineage: with regular Delta table you have to provide dependencies manually through tasks in a Workflow. In SDP this is done automatically;
  • Integrated Data Quality Framework
  • If run on Serverless SDP: the updates on the Materialized View are incremental. For instance, if you have a regular Delta table: 
    CREATE OR REPLACE TABLE tx_customers AS 
    SELECT * FROM transactions t
    LEFT JOIN customers c on t.customer_id = c.customer_id​
    If there is a new row appended to the transactions table, running this will recalculate the join for all transactions, even if we want to update only one. An equivalent Materialized View on Serverless SDP:
    CREATE OR REPLACE MATERIALIZED VIEW tx_customers AS 
    SELECT * FROM transactions t
    LEFT JOIN customers c on t.customer_id = c.customer_id​​
    will update / insert only that one transaction. It will handle all the complex logic of incremental updates under the hood so you don't have to do it yourself. 

A ​@​​dp.table will always result in a Materialized View. 

As a side note, a Materialized View is actually backed by Delta as well, but it has these extra features listed above.

Hope it helps!

Best regards,