cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Autoloader to concatenate CSV files that updates regularly into a single parquet dataframe.

Kjetil
New Contributor III

I have multiple large CSV files. One or more of these files changes now and then (a few times a day). The changes in the CSV files are both of type update and append (so both new rows) and updates of old. I want to combine all CSV files into a dataframe then write to parquet. So far I have the code below. I want to ensure that no rows are duplicated if there is an update. Say I have three files, a.csv, b.csv and c.csv. Now c.csv updates. I want to create a dataframe that puts these three csv files into one dataframe (they all have the same schema). I got the code below that does this, however, I am not sure what happens if c.csv updates. Will everything in c.csv overwrite the old information in c.csv? That is what I want.

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

autoloader expects new files, not updates/overwrites of old files.

So basically autoloader will look for new filenames in the directory and process all new files.
And those files are (depending on settings/size) processed file by file or all at the same time.

If you want to make sure you have no dups, you will have to create a function that is processed with each batch and called by forEachBatch.
https://docs.databricks.com/en/structured-streaming/foreach.html

View solution in original post

3 REPLIES 3

Kjetil
New Contributor III

Here is the code (forgot to add)

(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "false") # Assuming the CSV files have headers
.schema(schema) # Specify the schema here
.option("cloudFiles.schemaLocation", checkpoint_dir)
.load(source_files)
.writeStream
.format("parquet")
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.start(output_path)
)

-werners-
Esteemed Contributor III

autoloader expects new files, not updates/overwrites of old files.

So basically autoloader will look for new filenames in the directory and process all new files.
And those files are (depending on settings/size) processed file by file or all at the same time.

If you want to make sure you have no dups, you will have to create a function that is processed with each batch and called by forEachBatch.
https://docs.databricks.com/en/structured-streaming/foreach.html

jose_gonzalez
Moderator
Moderator

Hi @Kjetil,

Please let us know if you still have issue or if @-werners- response could be mark as a best solution. Thank you

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group