Wednesday - last edited Wednesday
In a DLT pipeline I have a bronze table that ingest files using Autoloader, and a derived silver table that, for this example, just stores the number of rows for each file ingested into bronze. The basic code example:
import dlt
from pyspark.sql import functions as f
@dlt.table
def bronze():
# Autoloader code
@dlt.table
def silver():
# file_name is unique for each ingested file
return dlt.read_stream("bronze").groupBy("file_name").agg(f.count(f.lit(1)).alias("total"))
Since all I'm doing is counting rows of newly added files, and files are uniquely named, this should be a stateless operation - only count newly added rows within the microbatch, ignore previous ones, and ideally don't keep a state store that'd go unused.
But due to the aggregation, it'll be a stateful operation that will also account for previously ingested rows through the state store. This will result in an undesired complete table rewrite (output mode: complete).
A way to minimize this and the scope of the aggregation would be using a watermark, but even a minimum one (1s) will prevent the most recent file from being ingested until the next arrives.
An alternative would be using foreachBatch, but this would live outside the pipeline. That's not ideal, since I'd run the pipeline continuously for the lowest latency.
Is there a way to achieve what I'm looking for, a derived DLT that uses aggregations but only within new rows, in a stateless operation?
Thursday
It is not possible to achieve truly stateless microbatch aggregations within a Delta Live Tables (DLT) pipeline using standard declarative aggregation operations, because DLT streams are inherently designed to maintain state when using aggregations, groupBy, and window functions, to allow safe incremental/streaming computations.
When grouping and aggregating on columns (such as file_name
), Spark Structured Streaming (the foundation for DLT) must keep track of all seen groups in its state store so it can update results if new rows for the same group arrive in future microbatches.
The output mode defaults to complete for stream aggregations, triggering a full table rewrite and keeping track of previous aggregations, which is not stateless.
Watermarks only help trim state for time-based aggregations and late data, but, as you noted, they can cause recently arrived data to be delayed, waiting for the watermark to "advance" before emitting results for new groups.
Using a short watermark mitigates state growth but does not truly make operations stateless, and introduces potential latency/ingestion delays for recently added files.
foreachBatch can implement stateless logic (processing only data in the current batch), but it is imperative rather than declarative, breaking DLT's managed pipeline model. It cannot be wrapped in a basic @Dlt.table
, so orchestration and lineage benefits are lost.
There is no direct stateless aggregation within DLT's declarative API. The best alternatives are:
Use Delta Merge: Store raw ingested row counts in a staging temporary location (ideally a Delta table). Then, perform an upsert (“merge”) from this staging table into the silver
table using foreachBatch
. This way, the aggregate result is always up to date, and each batch only touches new files, but this must be orchestrated outside of true @Dlt.table
context.
External Aggregation Logic: Execute Spark jobs outside DLT (using notebooks or jobs) to periodically compute row counts and write them to the target table, preserving statelessness.
DLT with Expectation: If the only metric is ingest rate, consider using DLT's built-in data quality and expectation features to track row counts or failures, although this does not create a custom silver table.
Declarative aggregations in DLT (groupBy
/agg
) are always stateful because Spark must manage and update previously seen groups.
For purely stateless batch-level aggregation, imperative approaches (like foreachBatch
or external processing) are required, at the expense of breaking DLT's pipeline model.
Watermarking and windowing help control state scope but do not eliminate state or latency artifacts for newly arrived data.
There is no supported DLT-native stateless grouping/aggregation on dynamically arriving unique keys (such as file_name
) without maintaining state across microbatches.
Thursday
It is not possible to achieve truly stateless microbatch aggregations within a Delta Live Tables (DLT) pipeline using standard declarative aggregation operations, because DLT streams are inherently designed to maintain state when using aggregations, groupBy, and window functions, to allow safe incremental/streaming computations.
When grouping and aggregating on columns (such as file_name
), Spark Structured Streaming (the foundation for DLT) must keep track of all seen groups in its state store so it can update results if new rows for the same group arrive in future microbatches.
The output mode defaults to complete for stream aggregations, triggering a full table rewrite and keeping track of previous aggregations, which is not stateless.
Watermarks only help trim state for time-based aggregations and late data, but, as you noted, they can cause recently arrived data to be delayed, waiting for the watermark to "advance" before emitting results for new groups.
Using a short watermark mitigates state growth but does not truly make operations stateless, and introduces potential latency/ingestion delays for recently added files.
foreachBatch can implement stateless logic (processing only data in the current batch), but it is imperative rather than declarative, breaking DLT's managed pipeline model. It cannot be wrapped in a basic @Dlt.table
, so orchestration and lineage benefits are lost.
There is no direct stateless aggregation within DLT's declarative API. The best alternatives are:
Use Delta Merge: Store raw ingested row counts in a staging temporary location (ideally a Delta table). Then, perform an upsert (“merge”) from this staging table into the silver
table using foreachBatch
. This way, the aggregate result is always up to date, and each batch only touches new files, but this must be orchestrated outside of true @Dlt.table
context.
External Aggregation Logic: Execute Spark jobs outside DLT (using notebooks or jobs) to periodically compute row counts and write them to the target table, preserving statelessness.
DLT with Expectation: If the only metric is ingest rate, consider using DLT's built-in data quality and expectation features to track row counts or failures, although this does not create a custom silver table.
Declarative aggregations in DLT (groupBy
/agg
) are always stateful because Spark must manage and update previously seen groups.
For purely stateless batch-level aggregation, imperative approaches (like foreachBatch
or external processing) are required, at the expense of breaking DLT's pipeline model.
Watermarking and windowing help control state scope but do not eliminate state or latency artifacts for newly arrived data.
There is no supported DLT-native stateless grouping/aggregation on dynamically arriving unique keys (such as file_name
) without maintaining state across microbatches.
Thursday
Gotcha. Thanks for the reply.
We already have a medallion architecture going in production for a while, based on DLT pipelines having most bronze/silver tables, with some separate jobs for silver tables that are more complex to materialize. It works, but I'm not sure if we're using the best practices.
What would those best practices be, considering bronze tables are populated by autoloaders, and some silver tables will require aggregations? I imagine it's a pretty average scenario.
What if I wanted lower latency for silver tables, for those that can't be in a pipeline that I can run continuously? Would the best solution be jobs/notebooks leveraging `foreachBatch` that run until stopped?
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now