I have a DLT with a table that I want to contain the running aggregation (for the sake of simplicitly let's assume it's a count) for each value of some key column, using a session window. The input table goes back several years and to clean up aggregation state, I want to add a watermark. Doing this, however, appears to output no rows.
I believe this is because in the default append output mode, only expired session windows are emitted. Looking at the delta table's history I see appends only. How do I configure update output mode? Or is there another way to achieve my goal?
def running_aggregation():
return (
spark.readStream
.option("withEventTimeOrder", "true")
.table("LIVE.input_data")
.withWatermark("created", "365 days") # Watermark in combination with append output mode (don't know how to change for DLT) results in only expired session windows being output..
.groupBy(session_window("created", "90 days"), "key")
.agg(
count('*')
)