<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Streaming read and writing with aggregation in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/streaming-read-and-writing-with-aggregation/m-p/155848#M54323</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I have the following problem: on a medallion architecture on a bronze volume I get files every month containing the data for each sensor reading during the period 1 of month 00:00 to last day 23:00. I have a manual job that calls the python files to load and transform from landing -&amp;gt; bronze-&amp;gt; silver. Now I want to add the aggregated data to a gold table. I have the following code:&lt;/P&gt;&lt;P&gt;df_silver_swell_metrics = (&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; spark.readStream&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format("delta")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .table(f"cor_project.silver.swell_metrics")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .withWatermark("datetime", "0 second")&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;df_silver_swell_metrics_transformed = (&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; df_silver_swell_metrics&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .groupBy(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; F.window("datetime", "1 day").alias("window"),&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "coast_name"&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; ).agg(&lt;/P&gt;&lt;P&gt;...&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;df_gold_wave_daily_summary = (&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; df_silver_swell_metrics_transformed&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .select(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ...&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; )&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; df_gold_wave_daily_summary.writeStream&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format("delta")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .outputMode("append")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .trigger(availableNow=True)&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("checkpointLocation", f"/Volumes/cor_{ambiente}/gold/data/checkpoints/wave_daily_summary")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toTable(f"cor_project.gold.wave_daily_summary")&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;This works it doesn't repeat data that has already been processed&amp;nbsp; but it always misses the last day of the month. I have asked in other groups and they told me to use batch processing. Any thoughts on posible solutions?&lt;/P&gt;&lt;P&gt;Thanks for any comment&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Thu, 30 Apr 2026 05:41:03 GMT</pubDate>
    <dc:creator>Guillermo-HR</dc:creator>
    <dc:date>2026-04-30T05:41:03Z</dc:date>
    <item>
      <title>Streaming read and writing with aggregation</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-read-and-writing-with-aggregation/m-p/155848#M54323</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I have the following problem: on a medallion architecture on a bronze volume I get files every month containing the data for each sensor reading during the period 1 of month 00:00 to last day 23:00. I have a manual job that calls the python files to load and transform from landing -&amp;gt; bronze-&amp;gt; silver. Now I want to add the aggregated data to a gold table. I have the following code:&lt;/P&gt;&lt;P&gt;df_silver_swell_metrics = (&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; spark.readStream&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format("delta")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .table(f"cor_project.silver.swell_metrics")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .withWatermark("datetime", "0 second")&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;df_silver_swell_metrics_transformed = (&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; df_silver_swell_metrics&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .groupBy(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; F.window("datetime", "1 day").alias("window"),&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "coast_name"&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; ).agg(&lt;/P&gt;&lt;P&gt;...&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;df_gold_wave_daily_summary = (&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; df_silver_swell_metrics_transformed&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; .select(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ...&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; )&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;(&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; df_gold_wave_daily_summary.writeStream&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format("delta")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .outputMode("append")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .trigger(availableNow=True)&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("checkpointLocation", f"/Volumes/cor_{ambiente}/gold/data/checkpoints/wave_daily_summary")&lt;/P&gt;&lt;P&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toTable(f"cor_project.gold.wave_daily_summary")&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;This works it doesn't repeat data that has already been processed&amp;nbsp; but it always misses the last day of the month. I have asked in other groups and they told me to use batch processing. Any thoughts on posible solutions?&lt;/P&gt;&lt;P&gt;Thanks for any comment&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 30 Apr 2026 05:41:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-read-and-writing-with-aggregation/m-p/155848#M54323</guid>
      <dc:creator>Guillermo-HR</dc:creator>
      <dc:date>2026-04-30T05:41:03Z</dc:date>
    </item>
  </channel>
</rss>

