Hello All,
I am facing this issue from long time but so far there is no solution. I have delta table. My bronze layer is picking up the old files (mostly 8 days old file) randomly. My source of files is azure blob storage.
Those files and not being updated or added again. Because of this my bronze layer is having lots of duplicate records, which i have to clean up manually.
This is the code i m using:
source_dir = "dbfs:/mnt/blobstorage/xyz/abcd"
basePath = "dbfs:/user/hive/warehouse/insight/abcd"
sourcePath = source_dir + "/source"
bronzePath = basePath + "/bronze"
silverPath = basePath + "/silver"
goldPath = basePath + "/gold"
checkpointPath = basePath + "/checkpoints"
%python
#read file from blob store to view raw_temp
(spark.readStream
.format("csv")
.schema("table schema as string")
.load(sourcePath)
.selectExpr("*", "_metadata as source_metadata")
.createOrReplaceTempView("raw_temp"))
create or replace temporary view bronze_temp as (
select
uuid() as RecordID,
*,
0 as IsDeleted,
getdate() as RecordCreateDate,
current_user() as RecordCreateBy,
getdate() as RecordModifyDate,
current_user() as RecordModifyBy
from raw_temp
)
%python
#write streaming view from csv blob store to delta bronze
(spark.table("bronze_temp")
.writeStream
.format("delta")
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.trigger(once=True)
.start(bronzePath)
)
Appreciate any help with this.
Thanks