cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Stateless streaming with aggregations on a DLT/Lakeflow pipeline

rcostanza
New Contributor III

In a DLT pipeline I have a bronze table that ingest files using Autoloader, and a derived silver table that, for this example, just stores the number of rows for each file ingested into bronze. The basic code example:

 

import dlt
from pyspark.sql import functions as f

@dlt.table
def bronze():
    # Autoloader code

@dlt.table
def silver():
    # file_name is unique for each ingested file
    return dlt.read_stream("bronze").groupBy("file_name").agg(f.count(f.lit(1)).alias("total"))

 

Since all I'm doing is counting rows of newly added files, and files are uniquely named, this should be a stateless operation - only count newly added rows within the microbatch, ignore previous ones, and ideally don't keep a state store that'd go unused.

But due to the aggregation, it'll be a stateful operation that will also account for previously ingested rows through the state store. This will result in an undesired complete table rewrite (output mode: complete).

A way to minimize this and the scope of the aggregation would be using a watermark, but even a minimum one (1s) will prevent the most recent file from being ingested until the next arrives.

An alternative would be using foreachBatch, but this would live outside the pipeline. That's not ideal, since I'd run the pipeline continuously for the lowest latency.

Is there a way to achieve what I'm looking for, a derived DLT that uses aggregations but only within new rows, in a stateless operation?

0 REPLIES 0