cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

COPY INTO generating duplicate rows in Delta table

dataexplorer
New Contributor III

Hello Everyone,

I'm trying to bulk load tables from a SQL server database into ADLS as parquet files and then loading these files into Delta tables (raw/bronze). I had done a one off history/base load but my subsequent incremental loads (which had a date overlap with history load) are generating duplicates. Some reading I've done so far is pointing out that using native python code such as 'for' loop isn't recommended in Databricks - I can work around this by appending all the SQL statements together and execute with single statement but wanted to know if this was the case and why? Thank you.

UPDATE: I've removed the code from the for loop and tried execution for a single table and it seems issue isn't with the 'for' loop. If the record was loaded during history load and occurs again in the incremental load but the row hasn't changed at all - it is still getting loaded into the table as a duplicate row. Do the incremental load files need to be mutually exclusive i.e. not contain any overlapping rows with previous files?

Code as below:

spark.sql("USE raw;")
 
files = dbutils.fs.ls(landingZoneLocation)
 
for fi in files: 
  if fi.isFile:
    delta_table_name = "crm_" + fi.name.split('.')[0].lower()
    deltaTableCopyIntoSQL = "COPY INTO delta.`dbfs:/mnt/raw/"+delta_table_name+"` FROM 'dbfs:/mnt/landing/crm/"+processDate+"/"+fi.name+ "' FILEFORMAT = PARQUET;"
    print(deltaTableCopyIntoSQL) 
    spark.sql(deltaTableCopyIntoSQL)

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

@Umar Ayub​, What you need is MERGE INTO, not COPY INTO.

With MERGE you can specify what records to insert, update, and delete.

https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/language-manual/delta-merge...

View solution in original post

6 REPLIES 6

-werners-
Esteemed Contributor III

@Umar Ayub​, What you need is MERGE INTO, not COPY INTO.

With MERGE you can specify what records to insert, update, and delete.

https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/language-manual/delta-merge...

652852
New Contributor III

According to the documentation, COPY INTO should not be inserting duplicated records, it should load a file only once. A caveat seems to be that you have to have loaded all initial data with COPY INTO, rather than CREATE the table with SELECT on some initial batch of files. I at least tried to update an existing table with new parquet files from the same S3 storage location, and the first run of COPY INTO duplicated everything.

Further, MERGE INTO does not appear to support merging from parquet files, so if I want to use that, I likely have to create a staging table in Delta. Sigh.

-werners-
Esteemed Contributor III

copy into itself does not insert dups, but it is an append operation. So if identical data already resides in the table you will have dups after the copy into.

Merge is specifically made to tackle that.

And it does support merging from parquet files. Almost all of my merges have parquet as a source.

652852
New Contributor III

The documentation states clearly that "This is a retriable and idempotent operation—files in the source location that have already been loaded are skipped." What it doesn't say is that this file load history is apparently specific to the COPY INTO operation--so in practice loading even the very first batch of data with COPY INTO is necessary to have a complete loading history and avoid reloading on subsequent runs.

But, I'll happily use merge, though I haven't found correct syntax to load with parquet -- can you please share example code using MERGE INTO and parquet?

-werners-
Esteemed Contributor III

Correct!

There is no specific syntax for parquet.

You just read the source data (the new incoming records) into a spark dataframe.

Use that dataframe for the merge (if you use sql, first create a temp view on the dataframe).

dataexplorer
New Contributor III

thanks for the guidance!

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!