<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How to overwrite partition in DLT  pipeline ? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-overwrite-partition-in-dlt-pipeline/m-p/34149#M24927</link>
    <description>&lt;P&gt;What I have observed, &lt;/P&gt;&lt;P&gt;@dlt.table with a spark.read or dlt.read will create the table in mode=overwrite&lt;/P&gt;&lt;P&gt;@dlt.table with a spark.readStream or dlt.readStream will append new data&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;to get the update, use the &lt;A href="https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html?_ga=2.262791500.1125617558.1661773589-152423241.1656331743" alt="https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html?_ga=2.262791500.1125617558.1661773589-152423241.1656331743" target="_blank"&gt;CDC: Change data capture with Delta Live Tables | Databricks on AWS&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Tue, 30 Aug 2022 15:33:45 GMT</pubDate>
    <dc:creator>kfoster</dc:creator>
    <dc:date>2022-08-30T15:33:45Z</dc:date>
    <item>
      <title>How to overwrite partition in DLT  pipeline ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-overwrite-partition-in-dlt-pipeline/m-p/34148#M24926</link>
      <description>&lt;P&gt;I am trying to replicate my existing  spark pipeline in DLT. &lt;/P&gt;&lt;P&gt;I am not able to achieve desired result using DLT . &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Current pipeline : &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;I&gt;source set up : CSV file ingested in bronze using SCP  &lt;/I&gt;&lt;/P&gt;&lt;P&gt;&lt;I&gt;frequency : monthly &lt;/I&gt;&lt;/P&gt;&lt;P&gt;&lt;I&gt;bronze dir : /c&lt;/I&gt;&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;ntdlt/bronze&lt;/A&gt;/&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;emp&lt;/A&gt;/&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1&lt;/P&gt;&lt;P&gt;silver : read bronze  and  perform some transformation and write in silver with same partition column &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;val df =spark.read.option(...).csv(s"{basepath}/year=${year}/month=${month}").withColumn("some logic","")
&amp;nbsp;
spark.sql(s"CREATE DATABASE IF NOT EXISTS $dbName")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
val partitions= List("year","month" )
df.write.partitionBy(partitions:_*)
   .mode(saveMode)
   .option("path", silverpath)
    .saveAsTable(s"${dbName}.${tableName}")
spark.sql(s"msck repair table $dbName.$tableName")&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Scenario:  consider some source data issue if i want to reload data for some previous month in my spark pipleine set up i just run the  job for specific year and month value (provided by airlfow)&lt;/P&gt;&lt;P&gt;and because of &lt;B&gt;spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")&lt;/B&gt;&lt;/P&gt;&lt;P&gt;it update the data in partition under processing . &lt;/P&gt;&lt;P&gt;I am trying to achieve  similar set up using DLT pipeline . &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;DLT Setup &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date
from pyspark.sql import functions as F
&amp;nbsp;
schema='id int, \
date date,\
salary int,\
city string,\
name string,\
age int, \
year int, \
month int'
&amp;nbsp;
&amp;nbsp;
@dlt.create_table(
  comment="The raw employee, ingested from /databricks-datasets.",
  table_properties={
    "diggibyte_emp.quality": "bronze",
    "pipelines.autoOptimize.managed": "true",
     "delta.appendOnly":"false"
  }
)
def t_emp_raw():
  return (
     spark.readStream.format("cloudFiles")
      .option('cloudFiles.format', 'json')
      .option("partitionColumns", "year,month")  
      .option('cloudFiles.allowOverwrites', 'true') 
      .schema(schema)
      .load( '/mnt/cntdlt/bronze/emp/')
  )
&amp;nbsp;
&amp;nbsp;
&amp;nbsp;
@dlt.create_table(
  comment="The cleaned emp reocrd with partitioned by year, month",
  partition_cols = ["year", "month"],
  table_properties={
    "diggibyte_emp.quality": "silver",
    "pipelines.autoOptimize.managed": "true",
    "delta.appendOnly":"false"
  }
)
def t_emp_silver():
  return (
      dlt.readStream("t_emp_raw")
    .withColumn("load_date",current_date())
  )
&amp;nbsp;
&amp;nbsp;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;pipeline conf &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;{
    "id": "8496fc69-5ee3-4d61-9db5-47a38f130785",
    "clusters": [
        {
            "label": "default",
            "num_workers": 1
        }
    ],
    "development": true,
    "continuous": false,
    "channel": "CURRENT",
    "edition": "ADVANCED",
    "photon": false,
    "libraries": [
        {
            "notebook": {
                "path": "/demo/notebook_employe"
            }
        }
    ],
    "name": "monthly_load_employe",
    "storage": "dbfs:/pipelines/8496fc69-5ee3-4d61-9db5-47a38f130785",
    "target": "org"
}&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;DLT pipeline :&lt;/P&gt;&lt;P&gt;I have run the pipeline when bronze is having only 1 partition.&lt;/P&gt;&lt;P&gt;&lt;I&gt;/c&lt;/I&gt;&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;ntdlt/bronze&lt;/A&gt;/&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;emp&lt;/A&gt;/&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1  [consist 10 record ]&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;result in &lt;/P&gt;&lt;P&gt;t_emp_raw  for partition &lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1 = 10 records&lt;/P&gt;&lt;P&gt;t_empl_silver for partition &lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1 = 10 records&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Added data in bronze source &lt;/P&gt;&lt;P&gt;&lt;I&gt;/c&lt;/I&gt;&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;ntdlt/bronze&lt;/A&gt;/&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;emp&lt;/A&gt;/&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=2  [consist 10 records ]&lt;/P&gt;&lt;P&gt;result &lt;/P&gt;&lt;P&gt;t_emp_raw  for partition &lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=2 = 10 records&lt;/P&gt;&lt;P&gt;t_empl_silver for partition &lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=2 = 10 records&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Now consider ,source has send updated data for 1 month then &lt;/P&gt;&lt;P&gt;my partition on 01 need to be update and overwritten &lt;/P&gt;&lt;P&gt;&lt;I&gt;/c&lt;/I&gt;&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;ntdlt/bronze&lt;/A&gt;/&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;emp&lt;/A&gt;/&lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1  [consist 5 records ]&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;expected result &lt;/P&gt;&lt;P&gt;t_emp_raw  for partition &lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1/    : 5 records&lt;/P&gt;&lt;P&gt;t_empl_silver for partition &lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1 /    : 5 records&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Actual result &lt;/P&gt;&lt;P&gt;t_emp_raw  for partition &lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1/    : 15 records&lt;/P&gt;&lt;P&gt;t_empl_silver for partition &lt;A href="https://portal.azure.com/#" alt="https://portal.azure.com/#" target="_blank"&gt;year=2022&lt;/A&gt;&amp;nbsp;/month=1/  :15 records&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;How can i achieve the overwriting of partition when it is updated? &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Sun, 21 Aug 2022 10:10:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-overwrite-partition-in-dlt-pipeline/m-p/34148#M24926</guid>
      <dc:creator>Anuj_Diggibyte</dc:creator>
      <dc:date>2022-08-21T10:10:52Z</dc:date>
    </item>
    <item>
      <title>Re: How to overwrite partition in DLT  pipeline ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-overwrite-partition-in-dlt-pipeline/m-p/34149#M24927</link>
      <description>&lt;P&gt;What I have observed, &lt;/P&gt;&lt;P&gt;@dlt.table with a spark.read or dlt.read will create the table in mode=overwrite&lt;/P&gt;&lt;P&gt;@dlt.table with a spark.readStream or dlt.readStream will append new data&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;to get the update, use the &lt;A href="https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html?_ga=2.262791500.1125617558.1661773589-152423241.1656331743" alt="https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html?_ga=2.262791500.1125617558.1661773589-152423241.1656331743" target="_blank"&gt;CDC: Change data capture with Delta Live Tables | Databricks on AWS&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 30 Aug 2022 15:33:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-overwrite-partition-in-dlt-pipeline/m-p/34149#M24927</guid>
      <dc:creator>kfoster</dc:creator>
      <dc:date>2022-08-30T15:33:45Z</dc:date>
    </item>
    <item>
      <title>Re: How to overwrite partition in DLT  pipeline ?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-overwrite-partition-in-dlt-pipeline/m-p/34151#M24929</link>
      <description>&lt;P&gt;Ok, I will. Thanks for helping me out. I want to know how to overwrite partition in DLT pipeline and I found my answer over here. I am so happy today when I was searching for your post online, I also found &lt;A href="https://writinguniverse.com/free-essay-examples/graphic-design/" alt="https://writinguniverse.com/free-essay-examples/graphic-design/" target="_blank"&gt;https://writinguniverse.com/free-essay-examples/graphic-design/&lt;/A&gt; website where I found assignments solution and now I won't get low marks on my essay assignments.&lt;/P&gt;</description>
      <pubDate>Sat, 14 Jan 2023 09:58:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-overwrite-partition-in-dlt-pipeline/m-p/34151#M24929</guid>
      <dc:creator>AlbertErwin</dc:creator>
      <dc:date>2023-01-14T09:58:03Z</dc:date>
    </item>
  </channel>
</rss>

