<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic How do I update an aggregate table using a Delta live table in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-do-i-update-an-aggregate-table-using-a-delta-live-table/m-p/11743#M6677</link>
    <description>&lt;P&gt;I have am using delta live tables to stream events and I have a raw table for all the events and a downstream aggregate table. I need to add the new aggregated number to the downstream table aggregate column. But I didn't find any recipe talking about this.&lt;/P&gt;&lt;P&gt;My code is similar to this below.&lt;/P&gt;&lt;P&gt;@dlt.table()&lt;/P&gt;&lt;P&gt;def my_raw_table():&lt;/P&gt;&lt;P&gt;      return (&lt;/P&gt;&lt;P&gt;           spark.readStream.format("cloudFiles") \&lt;/P&gt;&lt;P&gt;           .option("cloudFiles.format", "parquet") \&lt;/P&gt;&lt;P&gt;           .option("recursiveFileLookup", "true") \&lt;/P&gt;&lt;P&gt;           .load(dirs)&lt;/P&gt;&lt;P&gt;      )&lt;/P&gt;&lt;P&gt;def my_aggregate_table():&lt;/P&gt;&lt;P&gt;     return (&lt;/P&gt;&lt;P&gt;           dlt.read("my_raw_table")&lt;/P&gt;&lt;P&gt;           .groupBy("col_id")&lt;/P&gt;&lt;P&gt;           .agg((max(col_a)-min(col_a)).alias(col_aggr))&lt;/P&gt;&lt;P&gt;     )&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;What I need to do is to add col_aggr in my_aggregate_table with the new aggregated value from my_raw_table. &lt;/P&gt;&lt;P&gt;The &lt;A href="https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html" alt="https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html" target="_blank"&gt;CDC with delta live table documentation&lt;/A&gt; seems only update with the new value, but didn't provide a way to increment the aggregate col_aggr with the new aggregate value. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I would like something in the delta live table similar to what can be done in delta lake table below:&lt;/P&gt;&lt;P&gt;my_aggregate_table.alias("aggr") \&lt;/P&gt;&lt;P&gt;.merge(&lt;/P&gt;&lt;P&gt;       my_raw_table.alias("updates"),&lt;/P&gt;&lt;P&gt;       "aggr.col_id = updates.col_id"&lt;/P&gt;&lt;P&gt;       )&lt;/P&gt;&lt;P&gt;.whenMatchedUpdate( set =  &lt;/P&gt;&lt;P&gt;      {&lt;/P&gt;&lt;P&gt;        "col_id": "updates.col_id",&lt;/P&gt;&lt;P&gt;        "col_aggr": "col_aggr" + "updates.col_aggr" &lt;/P&gt;&lt;P&gt;      }&lt;/P&gt;&lt;P&gt;     )&lt;/P&gt;&lt;P&gt;.whenNotMatchedInsert( values = &lt;/P&gt;&lt;P&gt;     {&lt;/P&gt;&lt;P&gt;        "col_id": "updates.col_id",&lt;/P&gt;&lt;P&gt;        "col_aggr": "updates.col_aggr"&lt;/P&gt;&lt;P&gt;     }&lt;/P&gt;&lt;P&gt;   ).execute()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Is there any way I can achieve this in delta live table? Or did I misunderstand how the delta live table work for aggregate tables?&lt;/P&gt;</description>
    <pubDate>Mon, 16 Jan 2023 11:37:43 GMT</pubDate>
    <dc:creator>Jennifer</dc:creator>
    <dc:date>2023-01-16T11:37:43Z</dc:date>
    <item>
      <title>How do I update an aggregate table using a Delta live table</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-update-an-aggregate-table-using-a-delta-live-table/m-p/11743#M6677</link>
      <description>&lt;P&gt;I have am using delta live tables to stream events and I have a raw table for all the events and a downstream aggregate table. I need to add the new aggregated number to the downstream table aggregate column. But I didn't find any recipe talking about this.&lt;/P&gt;&lt;P&gt;My code is similar to this below.&lt;/P&gt;&lt;P&gt;@dlt.table()&lt;/P&gt;&lt;P&gt;def my_raw_table():&lt;/P&gt;&lt;P&gt;      return (&lt;/P&gt;&lt;P&gt;           spark.readStream.format("cloudFiles") \&lt;/P&gt;&lt;P&gt;           .option("cloudFiles.format", "parquet") \&lt;/P&gt;&lt;P&gt;           .option("recursiveFileLookup", "true") \&lt;/P&gt;&lt;P&gt;           .load(dirs)&lt;/P&gt;&lt;P&gt;      )&lt;/P&gt;&lt;P&gt;def my_aggregate_table():&lt;/P&gt;&lt;P&gt;     return (&lt;/P&gt;&lt;P&gt;           dlt.read("my_raw_table")&lt;/P&gt;&lt;P&gt;           .groupBy("col_id")&lt;/P&gt;&lt;P&gt;           .agg((max(col_a)-min(col_a)).alias(col_aggr))&lt;/P&gt;&lt;P&gt;     )&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;What I need to do is to add col_aggr in my_aggregate_table with the new aggregated value from my_raw_table. &lt;/P&gt;&lt;P&gt;The &lt;A href="https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html" alt="https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html" target="_blank"&gt;CDC with delta live table documentation&lt;/A&gt; seems only update with the new value, but didn't provide a way to increment the aggregate col_aggr with the new aggregate value. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I would like something in the delta live table similar to what can be done in delta lake table below:&lt;/P&gt;&lt;P&gt;my_aggregate_table.alias("aggr") \&lt;/P&gt;&lt;P&gt;.merge(&lt;/P&gt;&lt;P&gt;       my_raw_table.alias("updates"),&lt;/P&gt;&lt;P&gt;       "aggr.col_id = updates.col_id"&lt;/P&gt;&lt;P&gt;       )&lt;/P&gt;&lt;P&gt;.whenMatchedUpdate( set =  &lt;/P&gt;&lt;P&gt;      {&lt;/P&gt;&lt;P&gt;        "col_id": "updates.col_id",&lt;/P&gt;&lt;P&gt;        "col_aggr": "col_aggr" + "updates.col_aggr" &lt;/P&gt;&lt;P&gt;      }&lt;/P&gt;&lt;P&gt;     )&lt;/P&gt;&lt;P&gt;.whenNotMatchedInsert( values = &lt;/P&gt;&lt;P&gt;     {&lt;/P&gt;&lt;P&gt;        "col_id": "updates.col_id",&lt;/P&gt;&lt;P&gt;        "col_aggr": "updates.col_aggr"&lt;/P&gt;&lt;P&gt;     }&lt;/P&gt;&lt;P&gt;   ).execute()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Is there any way I can achieve this in delta live table? Or did I misunderstand how the delta live table work for aggregate tables?&lt;/P&gt;</description>
      <pubDate>Mon, 16 Jan 2023 11:37:43 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-update-an-aggregate-table-using-a-delta-live-table/m-p/11743#M6677</guid>
      <dc:creator>Jennifer</dc:creator>
      <dc:date>2023-01-16T11:37:43Z</dc:date>
    </item>
    <item>
      <title>Re: How do I update an aggregate table using a Delta live table</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-update-an-aggregate-table-using-a-delta-live-table/m-p/11744#M6678</link>
      <description>&lt;P&gt;Maybe my code is correct already since I use dlt.read("my_raw_table") instead of delta.read_stream("my_raw_table"). So the col_aggr is recalculated completely every time my_raw_table is updated. &lt;/P&gt;</description>
      <pubDate>Mon, 16 Jan 2023 11:57:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-update-an-aggregate-table-using-a-delta-live-table/m-p/11744#M6678</guid>
      <dc:creator>Jennifer</dc:creator>
      <dc:date>2023-01-16T11:57:33Z</dc:date>
    </item>
  </channel>
</rss>

