cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
aleksandra_ch
Databricks Employee
Databricks Employee

How to: Master Streaming Tables and Materialized Views

Make sure to check out the previous post: https://community.databricks.com/t5/technical-blog/spark-declarative-pipelines-how-to-series-part-1-...

Welcome to the second post of the Lakeflow Spark Declarative Pipelines (SDP) “How-To” Series! In the previous post, we saw how SDP lets us focus entirely on business logic by automating persistence, checkpoints, and state management.

I can hear you asking:

"Okay, I get it now. I don’t have to call an explicit .save(). My code is cleaner, my logic is decoupled from the plumbing, and I’m finally thinking declaratively. But now I’m looking at these decorators—@​​​dp.table and @​​​dp.materialized_view—and they both seem to create tables. Wait, there’s another one, @​​​dp.temporary_view? Which one do I use for my Bronze layer? Which one is for the Gold layer?"

In this post, we’ll dive into these building blocks and show how to choose the right one for a specific use case.

The Basis: It’s all about the Flow

In the declarative world, we don't just create tables; we define the flow that hydrates them. Think of a flow as a "data contract" between your source and destination.

A flow determines two things:

  • Which data is moving (defined by a query or a DataFrame).
  • How it lands (Append, Upsert, or Replace).

A flow is not just a query — it is a stateful, incremental computation with zero plumbing. It tracks what has already been processed, so that on each run only new or changed data is handled. This is what allows incremental semantics, whether you run it in streaming or batch mode. 

In SDP, flows are the core of your pipeline. Sometimes they are defined implicitly through a decorator, and sometimes you define them explicitly to handle more complex logic.

Finally, each flow has its own destination—a physical table in Unity Catalog (UC), or a sink to an external system.

Virtual Logic: The @​​​dp.temporary_view

Before we persist anything, let's talk about Views. Not every step in your pipeline needs to be a physical table.

The @​​​dp.temporary_view decorator allows you to define temporary views. These are perfect for structuring your code and making it more modular and readable. They don't cost anything in storage and are ephemeral. You can easily reference them within your pipeline. A view lives within its pipeline only. Temporary views are useful when you want to break a pipeline into smaller transformation steps without creating extra physical tables — the results are then persisted downstream into Streaming Tables or Materialized Views.

Here's how you'd typically create a reusable transformation. Notice how the SDP version wraps the same logic in a decorator, turning it into a named, referenceable pipeline object:

              Spark Imperative Code              Spark Declarative Pipelines
cleaned_sales_logs = (
  spark.readStream.table("raw_logs")
    .filter("amount > 0")
    .withColumnRenamed("ts", "event_timestamp")
)
@​dp.temporary_view
def cleaned_sales_logs():
    return (
     spark.readStream.table("raw_logs")
        .filter("amount > 0")
        .withColumnRenamed("ts", "event_timestamp")
    )

The Append Flow

Imagine you have a Kafka topic or a directory of raw JSON files. Your goal is to land them into your Bronze layer as quickly as possible, perhaps with light, row-by-row transformations—like column formatting, adding a CASE WHEN statement, or joining with a static reference table. This is the classic Append Flow. Because the Append Flow is stateful, it automatically tracks which data has already been processed. Only new rows are ingested on each run — no manual bookkeeping required.

A Streaming Table (@​​​dp.table) is the primary target for an Append flow.

This is the direct equivalent of the Spark Structured Streaming writeStream API with .outputMode("append").

When you use the @​​​dp.table decorator, SDP implicitly creates an Append Flow for you:

@​​dp.table
def bronze_orders():
    return spark.readStream.table("raw_kafka_orders")

However, you can also define the flow explicitly if you want more control over the source-to-target relationship. This is helpful when you have multiple flows targeting the same Streaming Table:

# First, declare the target destination
dp.create_streaming_table("bronze_orders") 

# Then, explicitly define the flow 1 that hydrates it 
@dp.append_flow(target="bronze_orders")
def flow_cleaned_sales_logs():
    # This refers to our .temporary_view above!
    return dp.read_stream("cleaned_sales_logs")

# Define the schema for Kafka JSON messages
# order_schema = StructType([StructField("order_id", StringType()), ...])

# Define a second flow that hydrates from a Kafka topic
@dp.append_flow(target="bronze_orders")
def append_kafka_orders():
   return (
       spark.readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "<broker-host>:9092")
       .option("subscribe", "orders_topic")
       .option("startingOffsets", "latest")
       .load()
       .select(F.from_json(F.col("value").cast("string"), order_schema).alias("data"))
       .select("data.*")
   )

What if your Append Flow needs to write to an external database or Kafka topic instead of a table?  For arbitrary sinks, there are the dp.create_sink() function and the @​dp.foreach_batch_sink() decorator. We will deep dive into custom sinks in a future post.

The Append Once Flow

I know what you're thinking: "I really like this streaming approach, but I can't move to it. All my old data is sitting in a legacy table, and my Kafka topic retention is only 7 days. If I start a stream now, I lose my history. How do I bridge the gap?"

This is the perfect use case for the Append Once Flow. You can use a one-time batch flow to hydrate your table with legacy data, and then let the Append flow take over for the fresh data.

# 1. Declare the target table
dp.create_streaming_table("bronze_orders")

# 2. THE BACKFILL: Fill the table once from your legacy Parquet data
@​dp.append_flow(target="bronze_orders", once=True)
def backfill_historical():
    return (
         spark.read.format("parquet")
         .load("/Volumes/catalog/schema/historical_data")
    )

# 3. THE STREAM: Let the real-time flow handle everything moving forward
@​dp.append_flow(target="bronze_orders")
def stream_cleaned_sales_logs():
    return spark.readStream.table("cleaned_sales_logs")

The AUTO CDC Flow

What if your source isn't just a list of events, but a stream of database changes? You have updates and deletes, and you need your target table to reflect the current state. This is the classic Slowly Changing Dimension (SCD) Type 1 or Type 2 scenario.

In the traditional approach, this meant writing a massive MERGE INTO block inside a foreachBatch. Worse, you had to repeat that same boilerplate for every single pipeline you built.

In SDP, we replace that with the AUTO CDC Flow. And what is the physical target of an AUTO CDC Flow? It's the Streaming Table.

To use it, you first declare your target as a streaming table, and then use the function dp.create_auto_cdc_flow().

              Spark Imperative code             Spark Declarative Pipelines
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, struct, max_by
def upsert_to_delta(micro_batch_df, batch_id):
    # Deduplicate within the micro-batch to get the latest record per key
    (micro_batch_df
     .groupBy("key")
     .agg(max_by(struct("*"),
          col("ts")).alias("row"))
     .select("row.*")
     .createOrReplaceTempView("updates"))
    # Execute the MERGE logic
    # Note: Replace the ellipses with your actual column mapping
    spark.sql("""
        MERGE INTO cdc_data_raw t
        USING updates s
        ON s.key = t.key
        WHEN MATCHED AND s.is_deleted = true THEN 
            UPDATE SET DELETED_AT = now()
        WHEN MATCHED THEN UPDATE SET 
            A = CASE WHEN s.ts > t.ts THEN s.a ELSE t.a END,
            B = CASE WHEN s.ts > t.ts THEN s.b ELSE t.b END,
            -- ... for every column ...
            ts = CASE WHEN s.ts > t.ts THEN s.ts ELSE t.ts END
        WHEN NOT MATCHED THEN INSERT *
    """)
# Start the stream
(cdcData.writeStream
    .foreachBatch(upsert_to_delta)
    .outputMode("append")
    .start())
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@​dp.view
def users():
    return (
          spark.readStream
         .table("cdc_data.users")
        )
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
    target = "target",
    source = "users",
    keys = ["key"],
    sequence_by = col("ts"),
    apply_as_deletes = expr("is_deleted = True")
)

Materialized Views

Once your data is in Silver, you usually want to aggregate it for Gold—calculating daily revenue or active users. This is where Materialized Views (@​dp.materialized_viewcome into the picture.

A Materialized View is a special pipeline object which stores results of a query.

Think of a Materialized View as a query acceleration mechanism. It persists the result of your query and automatically updates it incrementally whenever the source changes. The framework appends, upserts, and deletes in order to reflect the current truth.

To solve this in traditional ETL, you usually have to choose between two difficult paths:

  • The Recompute Path: Whenever an update happens, you re-run the aggregation for that entire chunk of data (like a whole day or month). This gets more complex when handling late-arriving data.
  • The Watermark Path: You use watermarks to define a time window. The tradeoff? Any data arriving after the watermark is dropped and never considered.

Materialized Views solve this by acting as a "State" Mirror. They persist the query result and automatically update it incrementally. If data arrives late, SDP knows how to revisit that state and update the summary without you writing a single line of merge logic:

            Spark Imperative Code              Spark Declarative Pipelines
def update_aggs(microBatchDF, batchId):
    # 1. Identify affected dates
    distinct_dates = microBatchDF
.select("order_date").distinct().collect()
    date_list = [row.order_date for row in distinct_dates]
    
    if len(date_list) > 0:
        # 2. Recalculate chunks from source
        new_aggs = (spark.read
            .table("silver_orders")
            .filter(F.col("order_date")
                    .isin(date_list))
            .groupBy("order_date")
            .agg(F.sum("amount")
                 .alias("total_revenue")))
# 3. Sink: Merge into target
new_aggs.createOrReplaceTempView("updates")
spark.sql("""
    MERGE INTO daily_sales t 
    USING updates s ON s.order_date = t.order_date
    WHEN MATCHED THEN UPDATE SET t.total_revenue = s.total_revenue
     WHEN NOT MATCHED THEN INSERT *
     """)
(spark.readStream.table("silver_orders").writeStream.foreachBatch(update_aggregates).start())
@​dp.materialized_view
def daily_sales():
    return (spark.read
       .table("silver_orders")
       .groupBy("order_date")
       .agg(F.sum("amount").alias("total_revenue")))

Let's clarify the difference. You might be wondering: "Why can't I just use an Append Flow and a Streaming Table for my aggregations?"

An Append Flow never looks back on what has already been processed — it only moves forward, appending new rows as they arrive. Because it never revisits past data, running a standard SUM or GROUP BY over an Append Flow would require the engine to maintain an ever-growing state of all previously seen rows, with no natural point at which to finalize the calculation.

To do aggregations, we need a finite table—a specific "state" or "snapshot" that represents the current truth. This is exactly why we use spark.read instead of spark.readStream inside a Materialized View. By using read, we tell SDP to treat the source as a full dataset to be queried, which the framework then intelligently optimizes to update incrementally behind the scenes.

Final Decision Tree

If you need... Use this Flow Type
To land raw data from a streaming append-only source @​dp.table Append Flow
To process CDC data dp.create_auto_cdc_flow AUTO CDC Flow
A one-time backfill @​​dp.append_flow(once=True) Append Once Flow
Aggregations / Current Truth @​​​dp.materialized_view N/A*
Code refactoring @​​​dp.temporary_view N/A*

* Remember, Materialized Views and temporary views are pipeline objects rather than flows. Flows define how data moves into a destination, while these objects define how data is stored or structured within the pipeline.

Key Takeaway

Flows are the core primitive of SDP — stateful, incremental computations that track what has been processed, so you only need to decide how data lands, and the framework handles the rest. Streaming Tables and Materialized Views are targets of the flows. Altogether, they intelligently handle technical complexity, allowing you to focus entirely on the business logic.

Conclusion

Today, we covered how Flows land data into Streaming Tables and how Materialized Views help accelerate queries. In our next post, we’ll look at the Consumer side: how to read from these objects, the nuances between spark.read and spark.readStream, and how to choose the right one for your specific scenario. Stay tuned!