cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How do I update an aggregate table using a Delta live table

Jennifer
New Contributor III

I have am using delta live tables to stream events and I have a raw table for all the events and a downstream aggregate table. I need to add the new aggregated number to the downstream table aggregate column. But I didn't find any recipe talking about this.

My code is similar to this below.

@dlt.table()

def my_raw_table():

return (

spark.readStream.format("cloudFiles") \

.option("cloudFiles.format", "parquet") \

.option("recursiveFileLookup", "true") \

.load(dirs)

)

def my_aggregate_table():

return (

dlt.read("my_raw_table")

.groupBy("col_id")

.agg((max(col_a)-min(col_a)).alias(col_aggr))

)

What I need to do is to add col_aggr in my_aggregate_table with the new aggregated value from my_raw_table.

The CDC with delta live table documentation seems only update with the new value, but didn't provide a way to increment the aggregate col_aggr with the new aggregate value.

I would like something in the delta live table similar to what can be done in delta lake table below:

my_aggregate_table.alias("aggr") \

.merge(

my_raw_table.alias("updates"),

"aggr.col_id = updates.col_id"

)

.whenMatchedUpdate( set =

{

"col_id": "updates.col_id",

"col_aggr": "col_aggr" + "updates.col_aggr"

}

)

.whenNotMatchedInsert( values =

{

"col_id": "updates.col_id",

"col_aggr": "updates.col_aggr"

}

).execute()

Is there any way I can achieve this in delta live table? Or did I misunderstand how the delta live table work for aggregate tables?

1 REPLY 1

Jennifer
New Contributor III

Maybe my code is correct already since I use dlt.read("my_raw_table") instead of delta.read_stream("my_raw_table"). So the col_aggr is recalculated completely every time my_raw_table is updated.