cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Stateless streaming with aggregations on a DLT/Lakeflow pipeline

rcostanza
New Contributor III

In a DLT pipeline I have a bronze table that ingest files using Autoloader, and a derived silver table that, for this example, just stores the number of rows for each file ingested into bronze. The basic code example:

 

import dlt
from pyspark.sql import functions as f

@dlt.table
def bronze():
    # Autoloader code

@dlt.table
def silver():
    # file_name is unique for each ingested file
    return dlt.read_stream("bronze").groupBy("file_name").agg(f.count(f.lit(1)).alias("total"))

 

Since all I'm doing is counting rows of newly added files, and files are uniquely named, this should be a stateless operation - only count newly added rows within the microbatch, ignore previous ones, and ideally don't keep a state store that'd go unused.

But due to the aggregation, it'll be a stateful operation that will also account for previously ingested rows through the state store. This will result in an undesired complete table rewrite (output mode: complete).

A way to minimize this and the scope of the aggregation would be using a watermark, but even a minimum one (1s) will prevent the most recent file from being ingested until the next arrives.

An alternative would be using foreachBatch, but this would live outside the pipeline. That's not ideal, since I'd run the pipeline continuously for the lowest latency.

Is there a way to achieve what I'm looking for, a derived DLT that uses aggregations but only within new rows, in a stateless operation?

1 ACCEPTED SOLUTION

Accepted Solutions

mark_ott
Databricks Employee
Databricks Employee

It is not possible to achieve truly stateless microbatch aggregations within a Delta Live Tables (DLT) pipeline using standard declarative aggregation operations, because DLT streams are inherently designed to maintain state when using aggregations, groupBy, and window functions, to allow safe incremental/streaming computations.

Why DLT Aggregations Are Stateful

  • When grouping and aggregating on columns (such as file_name), Spark Structured Streaming (the foundation for DLT) must keep track of all seen groups in its state store so it can update results if new rows for the same group arrive in future microbatches.

  • The output mode defaults to complete for stream aggregations, triggering a full table rewrite and keeping track of previous aggregations, which is not stateless.

Watermark Limitations

  • Watermarks only help trim state for time-based aggregations and late data, but, as you noted, they can cause recently arrived data to be delayed, waiting for the watermark to "advance" before emitting results for new groups.

  • Using a short watermark mitigates state growth but does not truly make operations stateless, and introduces potential latency/ingestion delays for recently added files.

foreachBatch Drawback

  • foreachBatch can implement stateless logic (processing only data in the current batch), but it is imperative rather than declarative, breaking DLT's managed pipeline model. It cannot be wrapped in a basic @Dlt.table, so orchestration and lineage benefits are lost.

Alternative Approaches

There is no direct stateless aggregation within DLT's declarative API. The best alternatives are:

  • Use Delta Merge: Store raw ingested row counts in a staging temporary location (ideally a Delta table). Then, perform an upsert (“merge”) from this staging table into the silver table using foreachBatch. This way, the aggregate result is always up to date, and each batch only touches new files, but this must be orchestrated outside of true @Dlt.table context.

  • External Aggregation Logic: Execute Spark jobs outside DLT (using notebooks or jobs) to periodically compute row counts and write them to the target table, preserving statelessness.

  • DLT with Expectation: If the only metric is ingest rate, consider using DLT's built-in data quality and expectation features to track row counts or failures, although this does not create a custom silver table.

Key Takeaways

  • Declarative aggregations in DLT (groupBy/agg) are always stateful because Spark must manage and update previously seen groups.

  • For purely stateless batch-level aggregation, imperative approaches (like foreachBatch or external processing) are required, at the expense of breaking DLT's pipeline model.

  • Watermarking and windowing help control state scope but do not eliminate state or latency artifacts for newly arrived data.

  • There is no supported DLT-native stateless grouping/aggregation on dynamically arriving unique keys (such as file_name) without maintaining state across microbatches.

View solution in original post

5 REPLIES 5

mark_ott
Databricks Employee
Databricks Employee

It is not possible to achieve truly stateless microbatch aggregations within a Delta Live Tables (DLT) pipeline using standard declarative aggregation operations, because DLT streams are inherently designed to maintain state when using aggregations, groupBy, and window functions, to allow safe incremental/streaming computations.

Why DLT Aggregations Are Stateful

  • When grouping and aggregating on columns (such as file_name), Spark Structured Streaming (the foundation for DLT) must keep track of all seen groups in its state store so it can update results if new rows for the same group arrive in future microbatches.

  • The output mode defaults to complete for stream aggregations, triggering a full table rewrite and keeping track of previous aggregations, which is not stateless.

Watermark Limitations

  • Watermarks only help trim state for time-based aggregations and late data, but, as you noted, they can cause recently arrived data to be delayed, waiting for the watermark to "advance" before emitting results for new groups.

  • Using a short watermark mitigates state growth but does not truly make operations stateless, and introduces potential latency/ingestion delays for recently added files.

foreachBatch Drawback

  • foreachBatch can implement stateless logic (processing only data in the current batch), but it is imperative rather than declarative, breaking DLT's managed pipeline model. It cannot be wrapped in a basic @Dlt.table, so orchestration and lineage benefits are lost.

Alternative Approaches

There is no direct stateless aggregation within DLT's declarative API. The best alternatives are:

  • Use Delta Merge: Store raw ingested row counts in a staging temporary location (ideally a Delta table). Then, perform an upsert (“merge”) from this staging table into the silver table using foreachBatch. This way, the aggregate result is always up to date, and each batch only touches new files, but this must be orchestrated outside of true @Dlt.table context.

  • External Aggregation Logic: Execute Spark jobs outside DLT (using notebooks or jobs) to periodically compute row counts and write them to the target table, preserving statelessness.

  • DLT with Expectation: If the only metric is ingest rate, consider using DLT's built-in data quality and expectation features to track row counts or failures, although this does not create a custom silver table.

Key Takeaways

  • Declarative aggregations in DLT (groupBy/agg) are always stateful because Spark must manage and update previously seen groups.

  • For purely stateless batch-level aggregation, imperative approaches (like foreachBatch or external processing) are required, at the expense of breaking DLT's pipeline model.

  • Watermarking and windowing help control state scope but do not eliminate state or latency artifacts for newly arrived data.

  • There is no supported DLT-native stateless grouping/aggregation on dynamically arriving unique keys (such as file_name) without maintaining state across microbatches.

rcostanza
New Contributor III

Gotcha. Thanks for the reply.

We already have a medallion architecture going in production for a while, based on DLT pipelines having most bronze/silver tables, with some separate jobs for silver tables that are more complex to materialize. It works, but I'm not sure if we're using the best practices.

What would those best practices be, considering bronze tables are populated by autoloaders, and some silver tables will require aggregations? I imagine it's a pretty average scenario.

What if I wanted lower latency for silver tables, for those that can't be in a pipeline that I can run continuously? Would the best solution be jobs/notebooks leveraging `foreachBatch` that run until stopped?

mark_ott
Databricks Employee
Databricks Employee

For an average scenario in Databricks where bronze tables are populated by Autoloaders and silver tables require some aggregations, the following best practices are widely recommended:

Bronze Layer Best Practices

  • Land raw data in append-only Delta tables using Autoloader, with minimal transformation to preserve original data fidelity and auditability.​

  • Store file metadata (e.g., source file name, load date) in the bronze tables to support lineage and troubleshooting.​

  • Use partitioning (e.g., by date) to optimize query performance and lifecycle management.​

  • Maintain the rawest data possible (including duplicates and errors), and avoid deleting original files until successful landing and validation is confirmed.​

Silver Layer Best Practices

  • Read only from bronze; never load silver tables directly from sources, as this improves reliability and schema change handling.​

  • Clean, validate, deduplicate, and normalize data in silver tables. Enforce schema, handle nulls, fix data types, and standardize formats.​

  • Perform joins to combine related datasets or to slowly transform the data model (such as star or snowflake schemas). However, heavy aggregations are usually reserved for gold.​

  • When aggregations are required in silver (e.g., for rapid downstream access or business logic), use incremental processes such as MERGE or appropriate upserts, and clearly separate raw/validated vs. aggregated tables.​

  • Use streaming reads from bronze whenever possible, reserving batch reads for small, infrequently changing tables.​

  • For daily snapshot data, store all versions in bronze and deduplicate in silver using logic like ROW_NUMBER() to keep only the latest record per business key.​

Additional General Recommendations

  • Use Unity Catalog to govern table access and lineage across all layers, and register each table for easy discoverability and governance.​

  • Implement data quality checks and monitoring to ensure accuracy and detect drifts, preferably at the silver layer or above.​

  • Avoid irreversible transformations at the bronze stage so historic data can be reprocessed if downstream corrections are needed.​

  • Design aggregations with consumption and scaling in mind, potentially pushing major aggregations to the gold layer for BI or ML workloads.​

By following these practices, a robust, scalable, and future-proof Databricks lakehouse pipeline can be maintained, supporting both operational reliability and downstream analytical needs.

mark_ott
Databricks Employee
Databricks Employee

One more thing.  Turn off Delta Lake statistics collection on the first 32 columns for the Bronze table.  There is no need to waste your time collecting stats as you are going to do a full table scan on Bronze when you create the Silver.

mark_ott
Databricks Employee
Databricks Employee

For scenarios in Databricks where lower latency is needed for Silver tables but continuous streaming pipelines are not feasible, using jobs or notebooks with foreachBatch running in Structured Streaming mode is a common and recommended approach. This lets micro-batches process and write data incrementally, giving more frequent updates compared to traditional batch jobs while not requiring a fully continuous pipeline.​

Why foreachBatch Is Used

  • Enables micro-batch processing for data sinks (or targets) that may only support batch writes, not full streaming writes.​

  • Provides flexibility for upserting, deduplication, complex transformations, and custom logic that can't be expressed in straightforward streaming queries.​

  • Can be triggered by jobs or notebooks and run until manually stopped, making them suitable for near-real-time updates where a persistent pipeline isn't needed or isn't possible.​

Tradeoffs and Considerations

  • Latency depends on the trigger interval of the job/notebook (how often micro-batches are processed)—setting a lower interval (like a few seconds rather than minutes) can reduce Silver table latency.​

  • This approach usually provides “at least once” guarantees unless deduplication is explicitly handled (using batchId or primary key logic).​

  • If frequent updates and low-latency are a top priority, but a continuously running pipeline is too costly/complex, this method balances flexibility and performance for many Silver table scenarios.​

Additional Optimization Tips

  • For even lower latency, tune the micro-batch trigger interval and manage Spark cluster resources to avoid queuing delays.​

  • Consider file optimization options (such as compaction and optimized writes for Delta tables) to reduce read and write latency on Silver tables.​

  • If latency is still insufficient, continuous pipelines (while more expensive) may ultimately be required for real-time use cases, but for most “near-real-time” cases, the foreachBatch jobs/notebooks solution is effective and widely adopted.​

In summary, running jobs or notebooks leveraging foreachBatch until stopped is generally the best solution for low-latency Silver tables when continuous pipelines are not viable or required.​