Autoloader to concatenate CSV files that updates regularly into a single parquet dataframe.

Kjetil
Contributor

I have multiple large CSV files. One or more of these files changes now and then (a few times a day). The changes in the CSV files are both of type update and append (so both new rows) and updates of old. I want to combine all CSV files into a dataframe then write to parquet. So far I have the code below. I want to ensure that no rows are duplicated if there is an update. Say I have three files, a.csv, b.csv and c.csv. Now c.csv updates. I want to create a dataframe that puts these three csv files into one dataframe (they all have the same schema). I got the code below that does this, however, I am not sure what happens if c.csv updates. Will everything in c.csv overwrite the old information in c.csv? That is what I want.

Kjetil
Contributor

Here is the code (forgot to add)

(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "false") # Assuming the CSV files have headers
.schema(schema) # Specify the schema here
.option("cloudFiles.schemaLocation", checkpoint_dir)
.load(source_files)
.writeStream
.format("parquet")
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.start(output_path)
)

-werners-
Esteemed Contributor III

autoloader expects new files, not updates/overwrites of old files.

So basically autoloader will look for new filenames in the directory and process all new files.
And those files are (depending on settings/size) processed file by file or all at the same time.

If you want to make sure you have no dups, you will have to create a function that is processed with each batch and called by forEachBatch.
https://docs.databricks.com/en/structured-streaming/foreach.html

View solution in original post

jose_gonzalez
Databricks Employee
Databricks Employee

Hi @Kjetil,

Please let us know if you still have issue or if @-werners- response could be mark as a best solution. Thank you