cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
josh_melton
New Contributor III
New Contributor III

In the world of data science, there is often a need to optimize or migrate legacy code. In this blog post, we address a common technical challenge faced by many data scientists and engineers - making existing Pandas codebases more scalable and dynamic - by using approaches such as applyInPandas and Pandas UDFs. We’ll walk through a basic Pandas UDF use case, before showing how to pass parameters to applyInPandas and Pandas UDFs using closures. This approach can help you tune your code and make the most of the powerful features offered by Databricks.

What are Pandas UDFs?

Pandas UDFs (User Defined Functions) are a powerful feature that allows you to apply custom functions to Pandas DataFrame or Series in a vectorized manner. With the help of PyArrow, Pandas UDFs can significantly improve performance compared to using traditional for-loops. The applyInPandas function is a great example of how Pandas UDFs can be used to perform operations on data in a DataFrame or Series. Pandas UDFs can also be defined by using the pandas_udf decorator, which allows you to specify the input and output types of the function. Once defined, the UDF can be applied in parallel across a Spark Dataframe - far faster than the serial operation of a for-loop.

Pandas UDFs Use Cases

Pandas UDFs can be used for a variety of tasks, such as data cleaning, feature engineering, and data analysis. They’re often used to transition existing Pandas code from a single node environment to a distributed Spark environment, without having to change the logic or libraries being used. Here’s an example of using applyInPandas to normalize the values of a Spark DataFrame for each engine type:

 

import pandas as pd



df = spark.createDataFrame(pd.DataFrame({'type': ['turbine', 'turbine', 'propeller', 'turbine', 'propeller', 'propeller'], 'sensor_reading': [10, 7, 25, 12, 29, 36]}))



def normalize(pdf: pd.DataFrame) -> pd.DataFrame:

   reading = pdf.sensor_reading

   pdf['normalized'] = reading.mean() / reading.std()

   return pdf


expected_schema = 'type string, sensor_reading long, normalized long'
df.groupBy('type').applyInPandas(normalize, expected_schema).show()

 

Output:

+---------+--------------+----------+

|     type|sensor_reading|normalized|

+---------+--------------+----------+

|propeller|            25|         5|

|propeller|            29|         5|

|propeller|            36|         5|

|  turbine|            10|         3|

|  turbine|             7|         3|

|  turbine|            12|         3|

+---------+--------------+----------+

Challenge: Passing Parameters to applyInPandas

What if we want to do some hyperparameter tuning on our Pandas UDF or use a dynamic variable as an input to our function? Unfortunately, passing parameters to applyInPandas is not directly supported. applyInPandas expects a function with a single argument, which is the grouped DataFrame that it will apply the function to. Adding another parameter will throw an error:

 

# We don't have a way to pass a value like the mean of the whole dataframe - this throws an error

def normalize_plus_value(pdf: pd.DataFrame, value: int) -> pd.DataFrame:

   reading = pdf.sensor_reading

   pdf['normalized'] = value + (reading.mean() / reading.std())

   return pdf



df.groupBy('type').applyInPandas(normalize_plus_value, 'type string, sensor_reading long, normalized long').show()

 

AttributeError: 'tuple' object has no attribute 'sensor_reading'

Solution: Using closures to pass parameters

One solution to this problem is to use a closure. A closure is a function that has access to variables in its outer (enclosing) function scope. By defining a function inside a closure, you can create a dynamic function that captures the parameters you want to pass to applyInPandas.

Here is an example of using a closure to pass a parameter to an applyInPandas function:

 

def normalize_with_value(value: int):

   # Returning this function "injects" the value into the function we'll use for applyInPandas

   def normalize(pdf: pd.DataFrame) -> pd.DataFrame:

       reading = pdf.sensor_reading

       pdf['normalized'] = value - (reading.mean() / reading.std())

       return pdf

   return normalize



# Now we can initialize the function with a value inserted

average = df.selectExpr('avg(sensor_reading) as average').collect()[0][0]

dynamic_normalize = normalize_with_value(average)

df.groupBy('type').applyInPandas(dynamic_normalize, 'type string, sensor_reading long, normalized long').show()

 

Output:

+---------+--------------+----------+

|     type|sensor_reading|normalized|

+---------+--------------+----------+

|propeller|            25|        14|

|propeller|            29|        14|

|propeller|            36|        14|

|  turbine|            10|        15|

|  turbine|             7|        15|

|  turbine|            12|        15|

+---------+--------------+----------+

We can do the same with a Pandas UDF. For the purposes of this demonstration, we’ll pass the hyperparameters for an ARIMA model into a Pandas UDF:

 

from pyspark.sql.functions import pandas_udf

from statsmodels.tsa.arima.model import ARIMA



# Fit and run an ARIMA model using a Pandas UDF with the hyperparameters passed in

def create_arima_forecaster(order):

   @pandas_udf("double")

   def forecast_arima(value: pd.Series) -> pd.Series:

       model = ARIMA(value, order=order)

       model_fit = model.fit()

       return model_fit.predict()

   return forecast_arima



# Minimal Spark code - just select one column and add another. We can still use Pandas for our logic

forecast_arima = create_arima_forecaster((1, 2, 3))

df.withColumn('predicted_reading', forecast_arima('sensor_reading')).show()

 

 

Applications of Parameters for Pandas UDFs

Passing parameters to Pandas UDFs can be useful in a variety of scenarios, such as:

  • Hyperparameter tuning
  • Object-oriented programming with applyInPandas - for example, a dynamic feature engineering function in an MLflow Pyfunc model class
  • Building complex and dynamic data pipelines

For example, if we wanted to hyperparameter tune an ARIMA model called from a Pandas UDF, we could use the same approach to pass the various hyperparameter choices:

 

 

from hyperopt import hp, fmin, tpe, Trials

from pyspark.ml.evaluation import RegressionEvaluator



# Define the hyperparameter search space

search_space = {'p': 1, 'd': hp.quniform('d', 2, 3, 1), 'q': hp.quniform('q', 2, 4, 1)}



# Define the objective function to be minimized

def objective(params):

   order = (params['p'], params['d'], params['q'])

   forecast_arima = create_arima_forecaster(order)

   arima_output = df.withColumn('predicted_reading', forecast_arima('sensor_reading'))

   evaluator = RegressionEvaluator(predictionCol="predicted_reading",
                                   labelCol="sensor_reading", 
                                   metricName="rmse")

   rmse = evaluator.evaluate(arima_output)

   return rmse



# Run the hyperparameter optimization

trials = Trials()

best = fmin(fn=objective, space=search_space, algo=tpe.suggest, max_evals=6, trials=trials)

print('Best hyperparameters: ', best)

 

 

In this example, we're using Hyperopt to automatically measure the most effective configuration of our model. The objective function accepts a set of hyperparameters as input, creates an ARIMA forecaster with those hyperparameters using the create_arima_forecaster function, and applies the forecaster to the input DataFrame using the Pandas UDF forecast_arima. The resulting DataFrame is then evaluated so that the root mean squared error (RMSE) can be returned as the objective function value. In this case, we're searching over values of p, d, and q for the ARIMA model. The hyperparameter search space is defined using Hyperopt's hp function, which allows us to specify the range of values to search over for each hyperparameter. Finally, we use Hyperopt's fmin function to perform the hyperparameter optimization, specifying the objective function, search space, optimization algorithm, and number of evaluations to perform. The resulting optimal hyperparameters are then printed out and can be leveraged for any forecasts moving forward.

It’s useful to note that running Hyperopt for parallel hyperparameter tuning on Databricks Runtime can also be done via SparkTrials in place of standard Hyperopt Trials. SparkTrials automatically distributes each run of the objective function across the Spark cluster and makes it easy to configure your desired parallelism. In fact, SparkTrials is what powers Databricks’ AutoML under the hood!

Conclusion

In this blog post, we have learned about a neat programming trick for passing parameters to applyInPandas and Pandas UDFs. We have also looked at a viable use case for this tactic, hyperparameter tuning. To explore the theory of this topic further, we encourage you to investigate using Python’s functools partials to accomplish the same goal in a slightly different way. Ultimately, we hope you better understand how to use closures to make legacy or custom Pandas code more dynamic and scalable.