I have am using delta live tables to stream events and I have a raw table for all the events and a downstream aggregate table. I need to add the new aggregated number to the downstream table aggregate column. But I didn't find any recipe talking about this.
My code is similar to this below.
@dlt.table()
def my_raw_table():
return (
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.option("recursiveFileLookup", "true") \
.load(dirs)
)
def my_aggregate_table():
return (
dlt.read("my_raw_table")
.groupBy("col_id")
.agg((max(col_a)-min(col_a)).alias(col_aggr))
)
What I need to do is to add col_aggr in my_aggregate_table with the new aggregated value from my_raw_table.
The CDC with delta live table documentation seems only update with the new value, but didn't provide a way to increment the aggregate col_aggr with the new aggregate value.
I would like something in the delta live table similar to what can be done in delta lake table below:
my_aggregate_table.alias("aggr") \
.merge(
my_raw_table.alias("updates"),
"aggr.col_id = updates.col_id"
)
.whenMatchedUpdate( set =
{
"col_id": "updates.col_id",
"col_aggr": "col_aggr" + "updates.col_aggr"
}
)
.whenNotMatchedInsert( values =
{
"col_id": "updates.col_id",
"col_aggr": "updates.col_aggr"
}
).execute()
Is there any way I can achieve this in delta live table? Or did I misunderstand how the delta live table work for aggregate tables?