cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to overwrite partition in DLT pipeline ?

anujsen18
New Contributor

I am trying to replicate my existing spark pipeline in DLT.

I am not able to achieve desired result using DLT .

Current pipeline :

source set up : CSV file ingested in bronze using SCP

frequency : monthly

bronze dir : /cntdlt/bronze/emp/year=2022 /month=1

silver : read bronze and perform some transformation and write in silver with same partition column

val df =spark.read.option(...).csv(s"{basepath}/year=${year}/month=${month}").withColumn("some logic","")
 
spark.sql(s"CREATE DATABASE IF NOT EXISTS $dbName")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
val partitions= List("year","month" )
df.write.partitionBy(partitions:_*)
   .mode(saveMode)
   .option("path", silverpath)
    .saveAsTable(s"${dbName}.${tableName}")
spark.sql(s"msck repair table $dbName.$tableName")

Scenario: consider some source data issue if i want to reload data for some previous month in my spark pipleine set up i just run the job for specific year and month value (provided by airlfow)

and because of spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

it update the data in partition under processing .

I am trying to achieve similar set up using DLT pipeline .

DLT Setup

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date
from pyspark.sql import functions as F
 
schema='id int, \
date date,\
salary int,\
city string,\
name string,\
age int, \
year int, \
month int'
 
 
@dlt.create_table(
  comment="The raw employee, ingested from /databricks-datasets.",
  table_properties={
    "diggibyte_emp.quality": "bronze",
    "pipelines.autoOptimize.managed": "true",
     "delta.appendOnly":"false"
  }
)
def t_emp_raw():
  return (
     spark.readStream.format("cloudFiles")
      .option('cloudFiles.format', 'json')
      .option("partitionColumns", "year,month")  
      .option('cloudFiles.allowOverwrites', 'true') 
      .schema(schema)
      .load( '/mnt/cntdlt/bronze/emp/')
  )
 
 
 
@dlt.create_table(
  comment="The cleaned emp reocrd with partitioned by year, month",
  partition_cols = ["year", "month"],
  table_properties={
    "diggibyte_emp.quality": "silver",
    "pipelines.autoOptimize.managed": "true",
    "delta.appendOnly":"false"
  }
)
def t_emp_silver():
  return (
      dlt.readStream("t_emp_raw")
    .withColumn("load_date",current_date())
  )
 
 

pipeline conf

{
    "id": "8496fc69-5ee3-4d61-9db5-47a38f130785",
    "clusters": [
        {
            "label": "default",
            "num_workers": 1
        }
    ],
    "development": true,
    "continuous": false,
    "channel": "CURRENT",
    "edition": "ADVANCED",
    "photon": false,
    "libraries": [
        {
            "notebook": {
                "path": "/demo/notebook_employe"
            }
        }
    ],
    "name": "monthly_load_employe",
    "storage": "dbfs:/pipelines/8496fc69-5ee3-4d61-9db5-47a38f130785",
    "target": "org"
}

DLT pipeline :

I have run the pipeline when bronze is having only 1 partition.

/cntdlt/bronze/emp/year=2022 /month=1 [consist 10 record ]

result in

t_emp_raw for partition year=2022 /month=1 = 10 records

t_empl_silver for partition year=2022 /month=1 = 10 records

Added data in bronze source

/cntdlt/bronze/emp/year=2022 /month=2 [consist 10 records ]

result

t_emp_raw for partition year=2022 /month=2 = 10 records

t_empl_silver for partition year=2022 /month=2 = 10 records

Now consider ,source has send updated data for 1 month then

my partition on 01 need to be update and overwritten

/cntdlt/bronze/emp/year=2022 /month=1 [consist 5 records ]

expected result

t_emp_raw for partition year=2022 /month=1/ : 5 records

t_empl_silver for partition year=2022 /month=1 / : 5 records

Actual result

t_emp_raw for partition year=2022 /month=1/ : 15 records

t_empl_silver for partition year=2022 /month=1/ :15 records

How can i achieve the overwriting of partition when it is updated?

2 REPLIES 2

kfoster
Contributor

What I have observed,

@dlt.table with a spark.read or dlt.read will create the table in mode=overwrite

@dlt.table with a spark.readStream or dlt.readStream will append new data

to get the update, use the CDC: Change data capture with Delta Live Tables | Databricks on AWS

Ok, I will. Thanks for helping me out. I want to know how to overwrite partition in DLT pipeline and I found my answer over here. I am so happy today when I was searching for your post online, I also found https://writinguniverse.com/free-essay-examples/graphic-design/ website where I found assignments solution and now I won't get low marks on my essay assignments.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group