How to overwrite partition in DLT pipeline ?

Anuj_Diggibyte
Databricks MVP

I am trying to replicate my existing spark pipeline in DLT.

I am not able to achieve desired result using DLT .

Current pipeline :

source set up : CSV file ingested in bronze using SCP

frequency : monthly

bronze dir : /cntdlt/bronze/emp/year=2022 /month=1

silver : read bronze and perform some transformation and write in silver with same partition column

val df =spark.read.option(...).csv(s"{basepath}/year=${year}/month=${month}").withColumn("some logic","")
 
spark.sql(s"CREATE DATABASE IF NOT EXISTS $dbName")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
val partitions= List("year","month" )
df.write.partitionBy(partitions:_*)
   .mode(saveMode)
   .option("path", silverpath)
    .saveAsTable(s"${dbName}.${tableName}")
spark.sql(s"msck repair table $dbName.$tableName")

Scenario: consider some source data issue if i want to reload data for some previous month in my spark pipleine set up i just run the job for specific year and month value (provided by airlfow)

and because of spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

it update the data in partition under processing .

I am trying to achieve similar set up using DLT pipeline .

DLT Setup

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date
from pyspark.sql import functions as F
 
schema='id int, \
date date,\
salary int,\
city string,\
name string,\
age int, \
year int, \
month int'
 
 
@dlt.create_table(
  comment="The raw employee, ingested from /databricks-datasets.",
  table_properties={
    "diggibyte_emp.quality": "bronze",
    "pipelines.autoOptimize.managed": "true",
     "delta.appendOnly":"false"
  }
)
def t_emp_raw():
  return (
     spark.readStream.format("cloudFiles")
      .option('cloudFiles.format', 'json')
      .option("partitionColumns", "year,month")  
      .option('cloudFiles.allowOverwrites', 'true') 
      .schema(schema)
      .load( '/mnt/cntdlt/bronze/emp/')
  )
 
 
 
@dlt.create_table(
  comment="The cleaned emp reocrd with partitioned by year, month",
  partition_cols = ["year", "month"],
  table_properties={
    "diggibyte_emp.quality": "silver",
    "pipelines.autoOptimize.managed": "true",
    "delta.appendOnly":"false"
  }
)
def t_emp_silver():
  return (
      dlt.readStream("t_emp_raw")
    .withColumn("load_date",current_date())
  )
 
 

pipeline conf

{
    "id": "8496fc69-5ee3-4d61-9db5-47a38f130785",
    "clusters": [
        {
            "label": "default",
            "num_workers": 1
        }
    ],
    "development": true,
    "continuous": false,
    "channel": "CURRENT",
    "edition": "ADVANCED",
    "photon": false,
    "libraries": [
        {
            "notebook": {
                "path": "/demo/notebook_employe"
            }
        }
    ],
    "name": "monthly_load_employe",
    "storage": "dbfs:/pipelines/8496fc69-5ee3-4d61-9db5-47a38f130785",
    "target": "org"
}

DLT pipeline :

I have run the pipeline when bronze is having only 1 partition.

/cntdlt/bronze/emp/year=2022 /month=1 [consist 10 record ]

result in

t_emp_raw for partition year=2022 /month=1 = 10 records

t_empl_silver for partition year=2022 /month=1 = 10 records

Added data in bronze source

/cntdlt/bronze/emp/year=2022 /month=2 [consist 10 records ]

result

t_emp_raw for partition year=2022 /month=2 = 10 records

t_empl_silver for partition year=2022 /month=2 = 10 records

Now consider ,source has send updated data for 1 month then

my partition on 01 need to be update and overwritten

/cntdlt/bronze/emp/year=2022 /month=1 [consist 5 records ]

expected result

t_emp_raw for partition year=2022 /month=1/ : 5 records

t_empl_silver for partition year=2022 /month=1 / : 5 records

Actual result

t_emp_raw for partition year=2022 /month=1/ : 15 records

t_empl_silver for partition year=2022 /month=1/ :15 records

How can i achieve the overwriting of partition when it is updated?