cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

duplicate files in bronze delta table

hps2
New Contributor II

Hello All,

I am facing this issue from long time but so far there is no solution. I have delta table. My bronze layer is picking up the old files (mostly 8 days old file) randomly. My source of files is azure blob storage.

Those files and not being updated or added again. Because of this my bronze layer is having lots of duplicate records, which i have to clean up manually.

This is the code i m using:

 

source_dir = "dbfs:/mnt/blobstorage/xyz/abcd"
basePath = "dbfs:/user/hive/warehouse/insight/abcd"

sourcePath      = source_dir + "/source"
bronzePath      = basePath + "/bronze"
silverPath      = basePath + "/silver"
goldPath        = basePath + "/gold"
checkpointPath  = basePath + "/checkpoints"
%python
#read file from blob store to view raw_temp
(spark.readStream
    .format("csv")
    .schema("table schema as string")
    .load(sourcePath)
    .selectExpr("*", "_metadata as source_metadata")
    .createOrReplaceTempView("raw_temp"))
 
create or replace temporary view bronze_temp as (
select
  uuid() as  RecordID,
  *,
  0 as IsDeleted,
  getdate() as RecordCreateDate,
  current_user() as RecordCreateBy,
  getdate() as RecordModifyDate,
  current_user() as RecordModifyBy
from raw_temp
)
 
 
%python
#write streaming view from csv blob store to delta bronze
(spark.table("bronze_temp")
    .writeStream
    .format("delta")
    .option("checkpointLocation", checkpointPath)
    .outputMode("append")
    .trigger(once=True)
    .start(bronzePath)
)
 
Appreciate any help with this. 
 
Thanks
1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @hps2

 

File Ingestion and Metadata Capture:

  • When ingesting data into your bronze layer, consider capturing metadata information such as ingestion timestamps and source file names. This practice helps track the origin of data and can be useful for debugging.
  • Ensure that your pipeline reads from the correct sources and writes to the intended destination locations.

Handling Changed Rows:

  • Since your files are not being updated or added again, it’s essential to handle changed rows during repetitive ingestions.
  • There are a couple of common approaches:
    • Updating Changed Rows: Use upsert (merge) functionality to update changed rows. This ensures that the target table contains a single version of each row, reflecting the latest state of the corresponding source row. You’ll need to provide primary keys for each table.
    • Appending Changed Rows as New Versions: Preserve change history by appending changed source rows into the target table as new versions. While this approach simplifies ingestion design, it requires deduplication in subsequent phases.

Cleaning Up Old Files:

  • To prevent old files from being picked up, consider implementing a cleanup process. 

I hope these suggestions help you resolve the issue and improve your bronze layer’s data ingestion process! 🚀

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group