cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader to concatenate CSV files that updates regularly into a single parquet dataframe.

Kjetil
New Contributor III

I have multiple large CSV files. One or more of these files changes now and then (a few times a day). The changes in the CSV files are both of type update and append (so both new rows) and updates of old. I want to combine all CSV files into a dataframe then write to parquet. So far I have the code below. I want to ensure that no rows are duplicated if there is an update. Say I have three files, a.csv, b.csv and c.csv. Now c.csv updates. I want to create a dataframe that puts these three csv files into one dataframe (they all have the same schema). I got the code below that does this, however, I am not sure what happens if c.csv updates. Will everything in c.csv overwrite the old information in c.csv? That is what I want.

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

autoloader expects new files, not updates/overwrites of old files.

So basically autoloader will look for new filenames in the directory and process all new files.
And those files are (depending on settings/size) processed file by file or all at the same time.

If you want to make sure you have no dups, you will have to create a function that is processed with each batch and called by forEachBatch.
https://docs.databricks.com/en/structured-streaming/foreach.html

View solution in original post

3 REPLIES 3

Kjetil
New Contributor III

Here is the code (forgot to add)

(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "false") # Assuming the CSV files have headers
.schema(schema) # Specify the schema here
.option("cloudFiles.schemaLocation", checkpoint_dir)
.load(source_files)
.writeStream
.format("parquet")
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.start(output_path)
)

-werners-
Esteemed Contributor III

autoloader expects new files, not updates/overwrites of old files.

So basically autoloader will look for new filenames in the directory and process all new files.
And those files are (depending on settings/size) processed file by file or all at the same time.

If you want to make sure you have no dups, you will have to create a function that is processed with each batch and called by forEachBatch.
https://docs.databricks.com/en/structured-streaming/foreach.html

jose_gonzalez
Moderator
Moderator

Hi @Kjetil,

Please let us know if you still have issue or if @-werners- response could be mark as a best solution. Thank you

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!