06-08-2022 02:10 PM
Hello Everyone,
I'm trying to bulk load tables from a SQL server database into ADLS as parquet files and then loading these files into Delta tables (raw/bronze). I had done a one off history/base load but my subsequent incremental loads (which had a date overlap with history load) are generating duplicates. Some reading I've done so far is pointing out that using native python code such as 'for' loop isn't recommended in Databricks - I can work around this by appending all the SQL statements together and execute with single statement but wanted to know if this was the case and why? Thank you.
UPDATE: I've removed the code from the for loop and tried execution for a single table and it seems issue isn't with the 'for' loop. If the record was loaded during history load and occurs again in the incremental load but the row hasn't changed at all - it is still getting loaded into the table as a duplicate row. Do the incremental load files need to be mutually exclusive i.e. not contain any overlapping rows with previous files?
Code as below:
spark.sql("USE raw;")
files = dbutils.fs.ls(landingZoneLocation)
for fi in files:
if fi.isFile:
delta_table_name = "crm_" + fi.name.split('.')[0].lower()
deltaTableCopyIntoSQL = "COPY INTO delta.`dbfs:/mnt/raw/"+delta_table_name+"` FROM 'dbfs:/mnt/landing/crm/"+processDate+"/"+fi.name+ "' FILEFORMAT = PARQUET;"
print(deltaTableCopyIntoSQL)
spark.sql(deltaTableCopyIntoSQL)
06-09-2022 12:22 AM
@Umar Ayub, What you need is MERGE INTO, not COPY INTO.
With MERGE you can specify what records to insert, update, and delete.
06-09-2022 12:22 AM
@Umar Ayub, What you need is MERGE INTO, not COPY INTO.
With MERGE you can specify what records to insert, update, and delete.
10-03-2022 11:30 PM
According to the documentation, COPY INTO should not be inserting duplicated records, it should load a file only once. A caveat seems to be that you have to have loaded all initial data with COPY INTO, rather than CREATE the table with SELECT on some initial batch of files. I at least tried to update an existing table with new parquet files from the same S3 storage location, and the first run of COPY INTO duplicated everything.
Further, MERGE INTO does not appear to support merging from parquet files, so if I want to use that, I likely have to create a staging table in Delta. Sigh.
10-04-2022 12:23 AM
copy into itself does not insert dups, but it is an append operation. So if identical data already resides in the table you will have dups after the copy into.
Merge is specifically made to tackle that.
And it does support merging from parquet files. Almost all of my merges have parquet as a source.
10-04-2022 12:55 AM
The documentation states clearly that "This is a retriable and idempotent operation—files in the source location that have already been loaded are skipped." What it doesn't say is that this file load history is apparently specific to the COPY INTO operation--so in practice loading even the very first batch of data with COPY INTO is necessary to have a complete loading history and avoid reloading on subsequent runs.
But, I'll happily use merge, though I haven't found correct syntax to load with parquet -- can you please share example code using MERGE INTO and parquet?
10-04-2022 01:09 AM
Correct!
There is no specific syntax for parquet.
You just read the source data (the new incoming records) into a spark dataframe.
Use that dataframe for the merge (if you use sql, first create a temp view on the dataframe).
06-09-2022 03:26 PM
thanks for the guidance!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group