09-01-2022 05:10 AM
I need to read and transform several CSV files and then append them to a single data frame. I am able to do this in databricks using simple for loops, but I would like to speed this up.
Below is the rough structure of my code:
for filepath in all_filepaths:
df1 = read_file(filepath)
df2 = transform(df1)
df3 = df3.append(df2)
Rather than processing 1 file at a time is there a way to process them in parallel? There are plenty of solutions online but I could only get the following to work in databricks:
with ThreadPoolExecutor(max_workers=20) as pool:
df3 = pd.concat(pool.map(read_and_transform_df, all_filepaths))
For 153 files, the first approach took 3.35 mins and the second approach took 3.87 mins.
Is there a way to optimize the second approach or an alternative faster approach?
Thanks,
Tanjil
09-01-2022 01:07 PM
Please consider loading multiple CSV using an autoloader.
This way, you will speed it up 🙂
https://docs.microsoft.com/en-us/azure/databricks/ingestion/auto-loader/schema
I know that it can be confusing as it is a stream but please add
.trigger(availableNow=True)
to process it just one time and finish after everything is loaded.
09-01-2022 06:19 AM
You can read all the csv files from a path using wildcard character like spark.read.csv("path/*.csv")
However, since this will be processed in parallel, you wont get your records in exact same order as they exist in your csv files.
To achieve that, you need to sort your dataframe using a column that helps you identify which file a row is from.
If you don't have such column, then porbably create one. You can first write a simple code to add a column 'row_num' to each of your csv file with row number.
eg. for file1 with 100 rows the value will be 0 to 99 then for next file with 100 rows it'll be 100 to 199 and so on.
Then you can read all the files at once and order it by column row_num.
09-01-2022 07:12 AM
Hi,
Thank you for your reply. I have been able to read all the files in a single attempt using pyspark:
spk_df1 = spark.read.text(all_filepaths.tolist()).option("header", "true").format("csv").option(delimiter = ',')
However, I could not figure out how to perform any of the row-level transformations that are unique to each file. What would you suggest here?
09-01-2022 10:12 AM
Can you filter rows for a particular file? if yes, then take subset of dataframe using where `(df.where())` clause and apply respective transformations to them.
09-01-2022 01:07 PM
Please consider loading multiple CSV using an autoloader.
This way, you will speed it up 🙂
https://docs.microsoft.com/en-us/azure/databricks/ingestion/auto-loader/schema
I know that it can be confusing as it is a stream but please add
.trigger(availableNow=True)
to process it just one time and finish after everything is loaded.
09-09-2022 04:34 PM
Great recommendation @Hubert Dudek
09-20-2022 09:56 PM
Hi @tanjil
Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group