cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
cancel
Showing results for 
Search instead for 
Did you mean: 

Performance issue while calling mlflow endpoint

sanjay
Valued Contributor II

Hi,

I have pyspark dataframe and pyspark udf which calls mlflow model for each row but its performance is too slow.

Here is sample code

def myfunc(input_text):
   restult = mlflowmodel.predict(input_text)
   return result

myfuncUDF = udf(myfunc,StringType())

df = spark.sql("select * from test")
df=df.withColumn("test_result",myfuncUDF("input_text"))

Please suggest how to improve the performance.

Regards,
Sanjay

12 REPLIES 12

Kaniz
Community Manager
Community Manager

Hi @sanjay, One common problem when using PySpark UDFs is poor performance. This is due to the fact that PySpark UDFs cannot utilize the built-in optimizations of Spark. As a result, data is processed on a row-by-row basis, which is not an efficient approach.

 

Allow me to offer some recommendations for enhancing performance: 

 

  1. Incorporate Vectorized UDFs (or Pandas UDFs): These enable you to manipulate Pandas dataframes within the UDF, thereby greatly enhancing performance compared to traditional row-at-a-time UDFs.
  2. To ensure smooth data transmission, consider broadcasting smaller data. If your MLflow model is of a manageable size, it can be seamlessly distributed to all worker nodes. This approach effectively minimizes communication expenses among nodes.
  3. When it's possible, try to avoid using UDFs. PySpark offers a variety of built-in functions that are enhanced by the Catalyst optimizer. Utilizing these built-in functions can greatly increase efficiency compared to using UDFs. 

     We hope this information is useful to you! 😊

sanjay
Valued Contributor II

Thank you Kaniz for the suggestions. This is really helpful. I even tried using applyInPandas. Not sure if this is better than spark UDF. If not can you help me in converting this function to pandas udf or any other optimized function.

 

def myfunc(input_text):
   restult = mlflowmodel.predict(input_text)
   return result
def myfuncUDF(pdf):
  pdf['test_result']=pdf["input_text"].apply(myfunc)
  return pdf
 
df = spark.sql("select * from test")
df = df.groupBy("id").applyInPandas(myfuncUDF)
 
Regards,
Sanjay

Kaniz
Community Manager
Community Manager

Hi @sanjay,  Let’s convert your existing function to a Pandas UDF in PySpark. First, we’ll create a Pandas UDF that performs the same operation as your myfunc. Then, we’ll apply this UDF to your DataFrame.

 

Here’s the modified code:

 

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

# Define your function (equivalent to myfunc)
def myfunc(input_text):
   # Assuming mlflowmodel is defined elsewhere
   result = mlflowmodel.predict(input_text)
   return result

# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series) -> pd.Series:
   return input_text_series.apply(myfunc)

# Load your DataFrame
df = spark.sql("SELECT * FROM test")

# Apply the UDF to the DataFrame
df = df.withColumn("test_result", myfunc_udf(df["input_text"]))

# Group by "id" and apply the UDF
grouped_df = df.groupBy("id").applyInPandas(myfunc_udf)

# Now you have the result in the "test_result" column of the grouped DataFrame
 

A few notes:

  • Replace mlflowmodel.predict(input_text) with your actual prediction logic.
  • Make sure the data types match your use case (e.g., adjust StringType() if needed).
  • The myfunc_udf will apply your function to each partition of the DataFrame, leveraging Pandas for vectorized operations.

Feel free to adapt this code to your specific requirements! 😊

sanjay
Valued Contributor II

Thank you @Kaniz, its really helpful and did worked. Another quick question, I have to pass 2 parameters as input to myfunc. Please help how to pass multiple parameters. 

def myfunc(input_text, param2):
   # Assuming mlflowmodel is defined elsewhere
   result = mlflowmodel.predict(input_text, param2)
   return result

# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series, param2_series: pd.Series) -> pd.Series:
   return input_text_series.apply(myfunc) ??

 

Kaniz
Community Manager
Community Manager

Hi @sanjayTo pass multiple parameters to a pandas UDF in PySpark, you can define the UDF to accept multiple pandas Series as input and return a pandas Series as output. Here's how you can modify the myfunc_udf definition to accept the second parameter param2_series:

# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series, param2_series: pd.Series) -> pd.Series:
    def apply_myfunc(input_text, param2):
        return myfunc(input_text, param2)

    return input_text_series.apply(apply_myfunc, param2_series=param2_series)

In this example, I've defined an inner function apply_myfunc that takes both input_text and param2 as parameters and calls myfunc with these parameters. Then, the pandas Series input_text_series is applied using the apply function with apply_myfunc as the function to apply and param2_series as the keyword argument.

Now you can use myfunc_udf in your PySpark code, just like you did before:

df.withColumn("result", myfunc_udf("input_text_column", "param2_column"))

Remember to replace "input_text_column" and "param2_column" with the actual column names that contain the input text and the second parameter, respectively.

 

sanjay
Valued Contributor II

Hi Kaniz,

I started getting following error after using myfunc_udf with 2 parameters.

pythonException: 'ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()

Regards,

Sanjay

sanjay
Valued Contributor II

Hi @Kaniz Appreciate if you can help in resolving this issue.

Regards,

Sanjay

Isabeente
New Contributor II

I need to send two arguments to myfunc, thus I have another brief question. I need some guidance on how to pass in many parameters.

BR_DatabricksAI
Contributor

Hello Sanjay, 

Could you please share your code snippet as per latest changes?

sanjay
Valued Contributor II
Hi,
 
Here is code snippet as per latest changes.
 
def myfunc(t1,t2)
  return 'test'
 
@pandas_udf(psf.StringType())
def myfunc_udf(input_text_series: pd.Series, param2_series: pd.Series) -> pd.Series:
  def apply_myfunc(input_text, param2)
    return myfunc(input_text, param2)
  return input_text_series.apply(apply_myfunc, param2_series)
 
df.withColumn("result", myfunc_udf("input_text1", "input_text2"))
 
But I am getting error while running this

pythonException: 'ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()

BR_DatabricksAI
Contributor

Hello Sanjay,

The above code don't have the df defined. Can you share your df.show() output. 

Isabeente
New Contributor II

So good

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.