cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

duplicate files in bronze delta table

hps2
New Contributor

Hello All,

I am facing this issue from long time but so far there is no solution. I have delta table. My bronze layer is picking up the old files (mostly 8 days old file) randomly. My source of files is azure blob storage.

Those files and not being updated or added again. Because of this my bronze layer is having lots of duplicate records, which i have to clean up manually.

This is the code i m using:

 

source_dir = "dbfs:/mnt/blobstorage/xyz/abcd"
basePath = "dbfs:/user/hive/warehouse/insight/abcd"

sourcePath      = source_dir + "/source"
bronzePath      = basePath + "/bronze"
silverPath      = basePath + "/silver"
goldPath        = basePath + "/gold"
checkpointPath  = basePath + "/checkpoints"
%python
#read file from blob store to view raw_temp
(spark.readStream
    .format("csv")
    .schema("table schema as string")
    .load(sourcePath)
    .selectExpr("*", "_metadata as source_metadata")
    .createOrReplaceTempView("raw_temp"))
 
create or replace temporary view bronze_temp as (
select
  uuid() as  RecordID,
  *,
  0 as IsDeleted,
  getdate() as RecordCreateDate,
  current_user() as RecordCreateBy,
  getdate() as RecordModifyDate,
  current_user() as RecordModifyBy
from raw_temp
)
 
 
%python
#write streaming view from csv blob store to delta bronze
(spark.table("bronze_temp")
    .writeStream
    .format("delta")
    .option("checkpointLocation", checkpointPath)
    .outputMode("append")
    .trigger(once=True)
    .start(bronzePath)
)
 
Appreciate any help with this. 
 
Thanks
1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @hps2

 

File Ingestion and Metadata Capture:

  • When ingesting data into your bronze layer, consider capturing metadata information such as ingestion timestamps and source file names. This practice helps track the origin of data and can be useful for debugging.
  • Ensure that your pipeline reads from the correct sources and writes to the intended destination locations.

Handling Changed Rows:

  • Since your files are not being updated or added again, it’s essential to handle changed rows during repetitive ingestions.
  • There are a couple of common approaches:
    • Updating Changed Rows: Use upsert (merge) functionality to update changed rows. This ensures that the target table contains a single version of each row, reflecting the latest state of the corresponding source row. You’ll need to provide primary keys for each table.
    • Appending Changed Rows as New Versions: Preserve change history by appending changed source rows into the target table as new versions. While this approach simplifies ingestion design, it requires deduplication in subsequent phases.

Cleaning Up Old Files:

  • To prevent old files from being picked up, consider implementing a cleanup process. 

I hope these suggestions help you resolve the issue and improve your bronze layer’s data ingestion process! 🚀

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.