cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Performance for pyspark dataframe is very slow after using a @pandas_udf

RRO
Contributor

Hello,

I am currently working on a time series forecasting with FBProphet. Since I have data with many time series groups (~3000) I use a @pandas_udf to parallelize the training.

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def forecast_netprofit(prophtrain):
 
     ... 
 
     return results_pd
 
 
time_series_id_column_names = ['Grp1', 'Grp2', 'Grp3']
 
results = (prophtrain
           .groupby(time_series_id_column_names)
           .apply(forecast_netprofit)
          )

Now every time I want to display or do some operations on the results dataframe the performance is really low. For example: Just to display the first 1000 rows takes around 6min.

Is there a reason why the performance of the results is so slow and can I fix that somehow?

1 ACCEPTED SOLUTION

Accepted Solutions

RRO
Contributor

Thank you for the answers.

Unfortunately this did not solve the performance issue.

What I did now is I saved the results into a table:

results.write.mode("overwrite").saveAsTable("db.results")

This is probably not the best solution but after I do that I can work with the results data from the table.

View solution in original post

6 REPLIES 6

-werners-
Esteemed Contributor III

Spark will run on the whole dataset in background and return 1000 rows of that. So it might be that, not necessarily the function itself.

You can test that by f.e. starting with a dataset of 1000 records and apply the function on that.

RRO
Contributor

Alright, the dataset has around 80.000 rows and 12 columns - so it should not be to much to handle. I have different datasets that are bigger than that can be displayed within seconds. That is why I think it might be somehow related to the function...

-werners-
Esteemed Contributor III

could be, although it should use Arrow these days.

What version of spark do you use?

Databricks Runtime Version: 10.3 ML (includes Apache Spark 3.2.1, Scala 2.12)

Hubert-Dudek
Esteemed Contributor III

Please specify the type hint in the function so you will save some time. Something similar to (can be different hints needed, it is an example):

@pandas_udf(schema)

def forecast_netprofit(prophtrain: pd.Series) -> pd.Series

you could consider also using .agg(forecast_netprofit(prophtrain)) instead of .apply()

RRO
Contributor

Thank you for the answers.

Unfortunately this did not solve the performance issue.

What I did now is I saved the results into a table:

results.write.mode("overwrite").saveAsTable("db.results")

This is probably not the best solution but after I do that I can work with the results data from the table.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now