I am trying to replicate my existing spark pipeline in DLT.
I am not able to achieve desired result using DLT .
Current pipeline :
source set up : CSV file ingested in bronze using SCP
frequency : monthly
bronze dir : /cntdlt/bronze/emp/year=2022 /month=1
silver : read bronze and perform some transformation and write in silver with same partition column
val df =spark.read.option(...).csv(s"{basepath}/year=${year}/month=${month}").withColumn("some logic","")
spark.sql(s"CREATE DATABASE IF NOT EXISTS $dbName")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
val partitions= List("year","month" )
df.write.partitionBy(partitions:_*)
.mode(saveMode)
.option("path", silverpath)
.saveAsTable(s"${dbName}.${tableName}")
spark.sql(s"msck repair table $dbName.$tableName")
Scenario: consider some source data issue if i want to reload data for some previous month in my spark pipleine set up i just run the job for specific year and month value (provided by airlfow)
and because of spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
it update the data in partition under processing .
I am trying to achieve similar set up using DLT pipeline .
DLT Setup
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date
from pyspark.sql import functions as F
schema='id int, \
date date,\
salary int,\
city string,\
name string,\
age int, \
year int, \
month int'
@dlt.create_table(
comment="The raw employee, ingested from /databricks-datasets.",
table_properties={
"diggibyte_emp.quality": "bronze",
"pipelines.autoOptimize.managed": "true",
"delta.appendOnly":"false"
}
)
def t_emp_raw():
return (
spark.readStream.format("cloudFiles")
.option('cloudFiles.format', 'json')
.option("partitionColumns", "year,month")
.option('cloudFiles.allowOverwrites', 'true')
.schema(schema)
.load( '/mnt/cntdlt/bronze/emp/')
)
@dlt.create_table(
comment="The cleaned emp reocrd with partitioned by year, month",
partition_cols = ["year", "month"],
table_properties={
"diggibyte_emp.quality": "silver",
"pipelines.autoOptimize.managed": "true",
"delta.appendOnly":"false"
}
)
def t_emp_silver():
return (
dlt.readStream("t_emp_raw")
.withColumn("load_date",current_date())
)
pipeline conf
{
"id": "8496fc69-5ee3-4d61-9db5-47a38f130785",
"clusters": [
{
"label": "default",
"num_workers": 1
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "/demo/notebook_employe"
}
}
],
"name": "monthly_load_employe",
"storage": "dbfs:/pipelines/8496fc69-5ee3-4d61-9db5-47a38f130785",
"target": "org"
}
DLT pipeline :
I have run the pipeline when bronze is having only 1 partition.
/cntdlt/bronze/emp/year=2022 /month=1 [consist 10 record ]
result in
t_emp_raw for partition year=2022 /month=1 = 10 records
t_empl_silver for partition year=2022 /month=1 = 10 records
Added data in bronze source
/cntdlt/bronze/emp/year=2022 /month=2 [consist 10 records ]
result
t_emp_raw for partition year=2022 /month=2 = 10 records
t_empl_silver for partition year=2022 /month=2 = 10 records
Now consider ,source has send updated data for 1 month then
my partition on 01 need to be update and overwritten
/cntdlt/bronze/emp/year=2022 /month=1 [consist 5 records ]
expected result
t_emp_raw for partition year=2022 /month=1/ : 5 records
t_empl_silver for partition year=2022 /month=1 / : 5 records
Actual result
t_emp_raw for partition year=2022 /month=1/ : 15 records
t_empl_silver for partition year=2022 /month=1/ :15 records
How can i achieve the overwriting of partition when it is updated?