<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Dynamic partition overwrite with Streaming Data in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/63904#M32395</link>
    <description>&lt;P&gt;Thanks for your answer&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;. I have tried what you suggested with the following code in PySpark. I also add some lines to handle scenarios where receiving multiple files/partitions: for example in the first ingestion where I'm reading historical data:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;output_location = 'path'

def upsert_to_parquet(batchDF, batchId):
    # Extract partition keys (year, month, day) from batchDF
    # In case more than one partition is updated iterate over them
    partitions = batchDF.select('year', 'month', 'day').distinct().collect()
    for partition in partitions:
        year, month, day = partition['year'], partition['month'], partition['day']

        # Construct the target path based on partition keys
        partition_path = f'{output_location}data/year={year}/month={month}/day={day}'

        # Overwrite existing data in the target partition
        batchDF.filter((col('year') == year) &amp;amp; (col('month') == month) &amp;amp; (col('day') == day)).write.mode('overwrite').parquet(partition_path)


df.writeStream \
    .foreachBatch(upsert_to_parquet) \
    .option('checkpointLocation', output_location + 'checkpoint') \
    .trigger(availableNow = True) \
    .start()&lt;/LI-CODE&gt;&lt;P&gt;Apparently it solves the duplication on updates. However there are some additional issues:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Some extra files appear on my directory on every partition with logs: &lt;EM&gt;_committed_xxx, _started_xxx, _SUCCESS&lt;/EM&gt;&amp;nbsp;files&lt;/LI&gt;&lt;LI&gt;For each partition I got two .parquet files:&amp;nbsp;&lt;EM&gt;part-0000-xxx.snappy.parquet, part-0001-xxx.snappy.parquet&lt;/EM&gt;. The first one is empty, with only the schema information (column names). And the second one contains all the data&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Ideally only one parquet file should be created. Do you know why this happen?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Sat, 16 Mar 2024 17:55:17 GMT</pubDate>
    <dc:creator>Jorge3</dc:creator>
    <dc:date>2024-03-16T17:55:17Z</dc:date>
    <item>
      <title>Dynamic partition overwrite with Streaming Data</title>
      <link>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/63785#M32349</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I'm working on a job that propagate updates of data from a delta table to a parquet files (requirement of the consumer). The data is partitioned by day (year &amp;gt; month &amp;gt; day) and the daily data is updated every hour. I'm using table read streaming with checkpoint to load new data incrementally (appends and updates). The issue is that every time the new data is loaded, the job creates a new .parquet file instead of overwrite the old one.&lt;/P&gt;&lt;P&gt;The dynamic partition overwrite mode does exactly this, but I have tried and it didn't work with the writeStream method.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table").format("parquet").partitionBy("year", "month", "day").save("/path")&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I want to implement something similar to the dynamic partition overwrite but for incremental streaming data. So, new day data will create a new partition/file, and the following updates overwrites the last partition.&lt;/P&gt;&lt;P&gt;Does anyone know of a way to use this mode with streaming data?&lt;/P&gt;&lt;P&gt;Thanks in advance,&lt;BR /&gt;Jorge&lt;/P&gt;</description>
      <pubDate>Fri, 15 Mar 2024 08:59:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/63785#M32349</guid>
      <dc:creator>Jorge3</dc:creator>
      <dc:date>2024-03-15T08:59:28Z</dc:date>
    </item>
    <item>
      <title>Re: Dynamic partition overwrite with Streaming Data</title>
      <link>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/63904#M32395</link>
      <description>&lt;P&gt;Thanks for your answer&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;. I have tried what you suggested with the following code in PySpark. I also add some lines to handle scenarios where receiving multiple files/partitions: for example in the first ingestion where I'm reading historical data:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;output_location = 'path'

def upsert_to_parquet(batchDF, batchId):
    # Extract partition keys (year, month, day) from batchDF
    # In case more than one partition is updated iterate over them
    partitions = batchDF.select('year', 'month', 'day').distinct().collect()
    for partition in partitions:
        year, month, day = partition['year'], partition['month'], partition['day']

        # Construct the target path based on partition keys
        partition_path = f'{output_location}data/year={year}/month={month}/day={day}'

        # Overwrite existing data in the target partition
        batchDF.filter((col('year') == year) &amp;amp; (col('month') == month) &amp;amp; (col('day') == day)).write.mode('overwrite').parquet(partition_path)


df.writeStream \
    .foreachBatch(upsert_to_parquet) \
    .option('checkpointLocation', output_location + 'checkpoint') \
    .trigger(availableNow = True) \
    .start()&lt;/LI-CODE&gt;&lt;P&gt;Apparently it solves the duplication on updates. However there are some additional issues:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Some extra files appear on my directory on every partition with logs: &lt;EM&gt;_committed_xxx, _started_xxx, _SUCCESS&lt;/EM&gt;&amp;nbsp;files&lt;/LI&gt;&lt;LI&gt;For each partition I got two .parquet files:&amp;nbsp;&lt;EM&gt;part-0000-xxx.snappy.parquet, part-0001-xxx.snappy.parquet&lt;/EM&gt;. The first one is empty, with only the schema information (column names). And the second one contains all the data&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Ideally only one parquet file should be created. Do you know why this happen?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sat, 16 Mar 2024 17:55:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/63904#M32395</guid>
      <dc:creator>Jorge3</dc:creator>
      <dc:date>2024-03-16T17:55:17Z</dc:date>
    </item>
    <item>
      <title>Re: Dynamic partition overwrite with Streaming Data</title>
      <link>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/63907#M32397</link>
      <description>&lt;P&gt;Why not migrate to Delta and just use MERGE inside forEachBatch?&lt;/P&gt;</description>
      <pubDate>Sat, 16 Mar 2024 19:11:08 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/63907#M32397</guid>
      <dc:creator>Hubert-Dudek</dc:creator>
      <dc:date>2024-03-16T19:11:08Z</dc:date>
    </item>
    <item>
      <title>Re: Dynamic partition overwrite with Streaming Data</title>
      <link>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/72817#M34614</link>
      <description>&lt;P&gt;We had a similar situation,&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/25346"&gt;@Hubert-Dudek&lt;/a&gt;&amp;nbsp;we are using delta, but we are having some problems when propagating updates via merge, as you cannot read the resulting table as streaming source anymore... so using &lt;STRIKE&gt;complete&lt;/STRIKE&gt;&amp;nbsp;overwrite over parquet partitions might be a good idea if you have to read the table as a streaming source.&lt;BR /&gt;&lt;BR /&gt;This can give you best of both worlds, the resulting parquet/delta will not have duplicated entries for each update (similar to delta's merge), so if you read as a static dataframe it will be clean. Also you to read "updated" rows using streaming, alas being much more inneficient than update or merge.&lt;BR /&gt;&lt;BR /&gt;However I do not understand how&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;&amp;nbsp;response manages to ensure that the data in the batch corresponds with a particular partition and is not scattered among several partitions.&lt;/P&gt;</description>
      <pubDate>Wed, 12 Jun 2024 15:56:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dynamic-partition-overwrite-with-streaming-data/m-p/72817#M34614</guid>
      <dc:creator>JacintoArias</dc:creator>
      <dc:date>2024-06-12T15:56:07Z</dc:date>
    </item>
  </channel>
</rss>

