cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Read and transform CSVs in parallel.

tanjil
New Contributor III

I need to read and transform several CSV files and then append them to a single data frame. I am able to do this in databricks using simple for loops, but I would like to speed this up.

Below is the rough structure of my code:

for filepath in all_filepaths: 
    df1 = read_file(filepath)
    df2 = transform(df1)
    df3 = df3.append(df2)

Rather than processing 1 file at a time is there a way to process them in parallel? There are plenty of solutions online but I could only get the following to work in databricks:

with ThreadPoolExecutor(max_workers=20) as pool:
    df3 = pd.concat(pool.map(read_and_transform_df, all_filepaths))

For 153 files, the first approach took 3.35 mins and the second approach took 3.87 mins.

Is there a way to optimize the second approach or an alternative faster approach?

Thanks,

Tanjil

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

Please consider loading multiple CSV using an autoloader.

This way, you will speed it up 🙂

https://docs.microsoft.com/en-us/azure/databricks/ingestion/auto-loader/schema

I know that it can be confusing as it is a stream but please add

.trigger(availableNow=True)

to process it just one time and finish after everything is loaded.

View solution in original post

6 REPLIES 6

AmanSehgal
Honored Contributor III

You can read all the csv files from a path using wildcard character like spark.read.csv("path/*.csv")

However, since this will be processed in parallel, you wont get your records in exact same order as they exist in your csv files.

To achieve that, you need to sort your dataframe using a column that helps you identify which file a row is from.

If you don't have such column, then porbably create one. You can first write a simple code to add a column 'row_num' to each of your csv file with row number.

eg. for file1 with 100 rows the value will be 0 to 99 then for next file with 100 rows it'll be 100 to 199 and so on.

Then you can read all the files at once and order it by column row_num.

tanjil
New Contributor III

Hi,

Thank you for your reply. I have been able to read all the files in a single attempt using pyspark:

spk_df1 = spark.read.text(all_filepaths.tolist()).option("header", "true").format("csv").option(delimiter = ',')

However, I could not figure out how to perform any of the row-level transformations that are unique to each file. What would you suggest here?

AmanSehgal
Honored Contributor III

Can you filter rows for a particular file? if yes, then take subset of dataframe using where `(df.where())` clause and apply respective transformations to them.

Hubert-Dudek
Esteemed Contributor III

Please consider loading multiple CSV using an autoloader.

This way, you will speed it up 🙂

https://docs.microsoft.com/en-us/azure/databricks/ingestion/auto-loader/schema

I know that it can be confusing as it is a stream but please add

.trigger(availableNow=True)

to process it just one time and finish after everything is loaded.

Great recommendation @Hubert Dudek​ 

Vidula
Honored Contributor

Hi @tanjil​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.