cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

duplicate files in bronze delta table

hps2
New Contributor

Hello All,

I am facing this issue from long time but so far there is no solution. I have delta table. My bronze layer is picking up the old files (mostly 8 days old file) randomly. My source of files is azure blob storage.

Those files and not being updated or added again. Because of this my bronze layer is having lots of duplicate records, which i have to clean up manually.

This is the code i m using:

 

source_dir = "dbfs:/mnt/blobstorage/xyz/abcd"
basePath = "dbfs:/user/hive/warehouse/insight/abcd"

sourcePath      = source_dir + "/source"
bronzePath      = basePath + "/bronze"
silverPath      = basePath + "/silver"
goldPath        = basePath + "/gold"
checkpointPath  = basePath + "/checkpoints"
%python
#read file from blob store to view raw_temp
(spark.readStream
    .format("csv")
    .schema("table schema as string")
    .load(sourcePath)
    .selectExpr("*", "_metadata as source_metadata")
    .createOrReplaceTempView("raw_temp"))
 
create or replace temporary view bronze_temp as (
select
  uuid() as  RecordID,
  *,
  0 as IsDeleted,
  getdate() as RecordCreateDate,
  current_user() as RecordCreateBy,
  getdate() as RecordModifyDate,
  current_user() as RecordModifyBy
from raw_temp
)
 
 
%python
#write streaming view from csv blob store to delta bronze
(spark.table("bronze_temp")
    .writeStream
    .format("delta")
    .option("checkpointLocation", checkpointPath)
    .outputMode("append")
    .trigger(once=True)
    .start(bronzePath)
)
 
Appreciate any help with this. 
 
Thanks
1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @hps2

 

File Ingestion and Metadata Capture:

  • When ingesting data into your bronze layer, consider capturing metadata information such as ingestion timestamps and source file names. This practice helps track the origin of data and can be useful for debugging.
  • Ensure that your pipeline reads from the correct sources and writes to the intended destination locations.

Handling Changed Rows:

  • Since your files are not being updated or added again, it’s essential to handle changed rows during repetitive ingestions.
  • There are a couple of common approaches:
    • Updating Changed Rows: Use upsert (merge) functionality to update changed rows. This ensures that the target table contains a single version of each row, reflecting the latest state of the corresponding source row. You’ll need to provide primary keys for each table.
    • Appending Changed Rows as New Versions: Preserve change history by appending changed source rows into the target table as new versions. While this approach simplifies ingestion design, it requires deduplication in subsequent phases.

Cleaning Up Old Files:

  • To prevent old files from being picked up, consider implementing a cleanup process. 

I hope these suggestions help you resolve the issue and improve your bronze layer’s data ingestion process! 🚀

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!