<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Structured streaming from an overwrite delta path in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/structured-streaming-from-an-overwrite-delta-path/m-p/16940#M11031</link>
    <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt; Hi experts, &lt;/P&gt;&lt;P&gt;&lt;/P&gt; &lt;P&gt;&lt;/P&gt; I need to ingest data from an existing delta path to my own delta lake.
&lt;P&gt; The dataflow is as shown in the diagram:&lt;/P&gt;
&lt;P&gt;&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;
&lt;UL&gt;&lt;LI&gt;Data team reads full snapshot of a database table and overwrite to a delta path. This is done many times per day, but not fixed schedule everyday.&lt;/LI&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;/UL&gt;I need to stream every data changes to my own delta lake for downstream consumption. Basically the same with source delta lake, but with increased log &amp;amp; data retention period to enable time travelling for 3 years.
&lt;P&gt;&lt;/P&gt;
&lt;P&gt;&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;
&lt;P&gt; &lt;span class="lia-inline-image-display-wrapper" image-alt="0693f000007OoRcAAK"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2463i64BB3BB88EFED291/image-size/large?v=v2&amp;amp;px=999" role="button" title="0693f000007OoRcAAK" alt="0693f000007OoRcAAK" /&gt;&lt;/span&gt; &lt;/P&gt;
&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt; I have tried the following code:
&lt;PRE&gt;&lt;CODE&gt;def overwrite_microbatch(microdf, batchId):
  microdf.write.format("delta").mode("overwrite").save(sink_path)
(spark.readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .load(source_path)
  .writeStream
  .foreachBatch(overwrite_microbatch)
  .option("checkpointLocation", checkpoint_path)
  .start())&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;I&gt;
(.writeStream.format("delta").outputMode("append") does not work because "append" mode causes duplication and writeStream does NOT support "overwrite" mode.)&lt;/I&gt;&lt;/P&gt;&lt;P&gt;
    Which works, but I ran into 2 problems:&lt;/P&gt;
 &lt;OL&gt;&lt;LI&gt;Sink path is not storage optimized, i.e each version stores a full table snapshot in a .snappy.parquet file instead of only incremental changes.&lt;/LI&gt;&lt;LI&gt;If my streaming job fails to consume one or more versions, then the next microbatch contains a concat of 2+ versions that are not yet consumed. Which again causes duplication in sink path.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;&lt;/P&gt;
    What should be the right approach for this scenario? 
    
  &lt;P&gt;&lt;/P&gt;
    Any idea is very much appreciated. Thanks!
  &lt;P&gt;
    Best Regards,&lt;/P&gt;
  &lt;P&gt;
    Vu&lt;/P&gt;
  
&lt;P&gt;&lt;/P&gt;
&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 02 Aug 2021 13:45:16 GMT</pubDate>
    <dc:creator>Vu_QuangNguyen</dc:creator>
    <dc:date>2021-08-02T13:45:16Z</dc:date>
    <item>
      <title>Structured streaming from an overwrite delta path</title>
      <link>https://community.databricks.com/t5/data-engineering/structured-streaming-from-an-overwrite-delta-path/m-p/16940#M11031</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt; Hi experts, &lt;/P&gt;&lt;P&gt;&lt;/P&gt; &lt;P&gt;&lt;/P&gt; I need to ingest data from an existing delta path to my own delta lake.
&lt;P&gt; The dataflow is as shown in the diagram:&lt;/P&gt;
&lt;P&gt;&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;
&lt;UL&gt;&lt;LI&gt;Data team reads full snapshot of a database table and overwrite to a delta path. This is done many times per day, but not fixed schedule everyday.&lt;/LI&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;/UL&gt;I need to stream every data changes to my own delta lake for downstream consumption. Basically the same with source delta lake, but with increased log &amp;amp; data retention period to enable time travelling for 3 years.
&lt;P&gt;&lt;/P&gt;
&lt;P&gt;&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;
&lt;P&gt; &lt;span class="lia-inline-image-display-wrapper" image-alt="0693f000007OoRcAAK"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2463i64BB3BB88EFED291/image-size/large?v=v2&amp;amp;px=999" role="button" title="0693f000007OoRcAAK" alt="0693f000007OoRcAAK" /&gt;&lt;/span&gt; &lt;/P&gt;
&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt; I have tried the following code:
&lt;PRE&gt;&lt;CODE&gt;def overwrite_microbatch(microdf, batchId):
  microdf.write.format("delta").mode("overwrite").save(sink_path)
(spark.readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .load(source_path)
  .writeStream
  .foreachBatch(overwrite_microbatch)
  .option("checkpointLocation", checkpoint_path)
  .start())&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;I&gt;
(.writeStream.format("delta").outputMode("append") does not work because "append" mode causes duplication and writeStream does NOT support "overwrite" mode.)&lt;/I&gt;&lt;/P&gt;&lt;P&gt;
    Which works, but I ran into 2 problems:&lt;/P&gt;
 &lt;OL&gt;&lt;LI&gt;Sink path is not storage optimized, i.e each version stores a full table snapshot in a .snappy.parquet file instead of only incremental changes.&lt;/LI&gt;&lt;LI&gt;If my streaming job fails to consume one or more versions, then the next microbatch contains a concat of 2+ versions that are not yet consumed. Which again causes duplication in sink path.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;&lt;/P&gt;
    What should be the right approach for this scenario? 
    
  &lt;P&gt;&lt;/P&gt;
    Any idea is very much appreciated. Thanks!
  &lt;P&gt;
    Best Regards,&lt;/P&gt;
  &lt;P&gt;
    Vu&lt;/P&gt;
  
&lt;P&gt;&lt;/P&gt;
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 02 Aug 2021 13:45:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/structured-streaming-from-an-overwrite-delta-path/m-p/16940#M11031</guid>
      <dc:creator>Vu_QuangNguyen</dc:creator>
      <dc:date>2021-08-02T13:45:16Z</dc:date>
    </item>
  </channel>
</rss>

