<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Is it possible to join two aggregated streams of data? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/is-it-possible-to-join-two-aggregated-streams-of-data/m-p/32229#M23494</link>
    <description>&lt;P&gt;&lt;B&gt;&lt;U&gt;Objective&lt;/U&gt;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Within the context of a delta live table, I'm trying to merge two streams aggregation, but run into challenges. Is it possible to achieve such a join?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;&lt;U&gt;Context&lt;/U&gt;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Assume&lt;/P&gt;&lt;P&gt;- table &lt;I&gt;trades&lt;/I&gt; stores a list of trades with their associated time stamps&lt;/P&gt;&lt;P&gt;- table &lt;I&gt;trades_1d&lt;/I&gt; sums the value of all the trades on a given day&lt;/P&gt;&lt;P&gt;- table &lt;I&gt;stock_price&lt;/I&gt; stores a given stock price at some (non-constant) sampling frequency&lt;/P&gt;&lt;P&gt;- table &lt;I&gt;stock_price_1d&lt;/I&gt; averages the stock price over a given day&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;It would translate to something along those lines&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import pyspark.sql.functions as sqlf
&amp;nbsp;
sdf_trades = spark.readStream.format("delta").table(f"stock_price")
sdf_price = spark.readStream.format("delta").table(f"stock_price")
&amp;nbsp;
&amp;nbsp;
w = sqlf.window("timetstamp", "24 hours")
&amp;nbsp;
sdf_trades_1d = (
    sdf_trades
    .groupby(w)
    .agg(sqlf.sum("trade_value"))
    .withColumn("window_end", sqlf.col("window.end"))
    .withColumn("window_start", sqlf.col("window.start"))
)
                
sdf_price_1d = (
    sdf_price
    .groupby(w)
    .agg(sqlf.avg("value"))
    .withColumn("window_end", sqlf.col("window.end"))
    .withColumn("window_start", sqlf.col("window.start"))
).withWatermark("window_end", "48 hours")
&amp;nbsp;
sdf = sdf_trades_1d.join(sdf_price_1d, "window_end", "left")&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;&lt;U&gt;Issue&lt;/U&gt;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;When running the pseudo code above, I get&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;"Append more error: Multiple streaming aggregations are not supported with stream DataFrames/Datasets"&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Any suggestion on how I can make this work?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 08 Sep 2022 17:10:56 GMT</pubDate>
    <dc:creator>osoucy</dc:creator>
    <dc:date>2022-09-08T17:10:56Z</dc:date>
    <item>
      <title>Is it possible to join two aggregated streams of data?</title>
      <link>https://community.databricks.com/t5/data-engineering/is-it-possible-to-join-two-aggregated-streams-of-data/m-p/32229#M23494</link>
      <description>&lt;P&gt;&lt;B&gt;&lt;U&gt;Objective&lt;/U&gt;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Within the context of a delta live table, I'm trying to merge two streams aggregation, but run into challenges. Is it possible to achieve such a join?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;&lt;U&gt;Context&lt;/U&gt;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Assume&lt;/P&gt;&lt;P&gt;- table &lt;I&gt;trades&lt;/I&gt; stores a list of trades with their associated time stamps&lt;/P&gt;&lt;P&gt;- table &lt;I&gt;trades_1d&lt;/I&gt; sums the value of all the trades on a given day&lt;/P&gt;&lt;P&gt;- table &lt;I&gt;stock_price&lt;/I&gt; stores a given stock price at some (non-constant) sampling frequency&lt;/P&gt;&lt;P&gt;- table &lt;I&gt;stock_price_1d&lt;/I&gt; averages the stock price over a given day&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;It would translate to something along those lines&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import pyspark.sql.functions as sqlf
&amp;nbsp;
sdf_trades = spark.readStream.format("delta").table(f"stock_price")
sdf_price = spark.readStream.format("delta").table(f"stock_price")
&amp;nbsp;
&amp;nbsp;
w = sqlf.window("timetstamp", "24 hours")
&amp;nbsp;
sdf_trades_1d = (
    sdf_trades
    .groupby(w)
    .agg(sqlf.sum("trade_value"))
    .withColumn("window_end", sqlf.col("window.end"))
    .withColumn("window_start", sqlf.col("window.start"))
)
                
sdf_price_1d = (
    sdf_price
    .groupby(w)
    .agg(sqlf.avg("value"))
    .withColumn("window_end", sqlf.col("window.end"))
    .withColumn("window_start", sqlf.col("window.start"))
).withWatermark("window_end", "48 hours")
&amp;nbsp;
sdf = sdf_trades_1d.join(sdf_price_1d, "window_end", "left")&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;&lt;U&gt;Issue&lt;/U&gt;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;When running the pseudo code above, I get&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;"Append more error: Multiple streaming aggregations are not supported with stream DataFrames/Datasets"&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Any suggestion on how I can make this work?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 08 Sep 2022 17:10:56 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/is-it-possible-to-join-two-aggregated-streams-of-data/m-p/32229#M23494</guid>
      <dc:creator>osoucy</dc:creator>
      <dc:date>2022-09-08T17:10:56Z</dc:date>
    </item>
  </channel>
</rss>

