01-28-2024 11:24 AM
Hi,
I have pyspark dataframe and pyspark udf which calls mlflow model for each row but its performance is too slow.
Here is sample code
def myfunc(input_text):
restult = mlflowmodel.predict(input_text)
return result
myfuncUDF = udf(myfunc,StringType())
df = spark.sql("select * from test")
df=df.withColumn("test_result",myfuncUDF("input_text"))
Please suggest how to improve the performance.
Regards,
Sanjay
01-29-2024 12:13 AM - edited 01-29-2024 12:16 AM
Hi @sanjay, One common problem when using PySpark UDFs is poor performance. This is due to the fact that PySpark UDFs cannot utilize the built-in optimizations of Spark. As a result, data is processed on a row-by-row basis, which is not an efficient approach.
Allow me to offer some recommendations for enhancing performance:
01-31-2024 04:28 AM
Thank you Kaniz for the suggestions. This is really helpful. I even tried using applyInPandas. Not sure if this is better than spark UDF. If not can you help me in converting this function to pandas udf or any other optimized function.
02-06-2024 01:18 AM
Hi @sanjay, Let’s convert your existing function to a Pandas UDF in PySpark. First, we’ll create a Pandas UDF that performs the same operation as your myfunc. Then, we’ll apply this UDF to your DataFrame.
Here’s the modified code:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType# Define your function (equivalent to myfunc)
def myfunc(input_text):
# Assuming mlflowmodel is defined elsewhere
result = mlflowmodel.predict(input_text)
return result# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series) -> pd.Series:
return input_text_series.apply(myfunc)# Load your DataFrame
df = spark.sql("SELECT * FROM test")# Apply the UDF to the DataFrame
df = df.withColumn("test_result", myfunc_udf(df["input_text"]))# Group by "id" and apply the UDF
grouped_df = df.groupBy("id").applyInPandas(myfunc_udf)# Now you have the result in the "test_result" column of the grouped DataFrame
A few notes:
Feel free to adapt this code to your specific requirements! 😊
02-06-2024 11:36 PM
Thank you @Kaniz_Fatma, its really helpful and did worked. Another quick question, I have to pass 2 parameters as input to myfunc. Please help how to pass multiple parameters.
def myfunc(input_text, param2):
# Assuming mlflowmodel is defined elsewhere
result = mlflowmodel.predict(input_text, param2)
return result
# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series, param2_series: pd.Series) -> pd.Series:
return input_text_series.apply(myfunc) ??
02-07-2024 03:45 AM
Hi @sanjay, To pass multiple parameters to a pandas UDF in PySpark, you can define the UDF to accept multiple pandas Series as input and return a pandas Series as output. Here's how you can modify the myfunc_udf
definition to accept the second parameter param2_series
:
# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series, param2_series: pd.Series) -> pd.Series:
def apply_myfunc(input_text, param2):
return myfunc(input_text, param2)
return input_text_series.apply(apply_myfunc, param2_series=param2_series)
In this example, I've defined an inner function apply_myfunc
that takes both input_text
and param2
as parameters and calls myfunc
with these parameters. Then, the pandas Series input_text_series
is applied using the apply
function with apply_myfunc
as the function to apply and param2_series
as the keyword argument.
Now you can use myfunc_udf
in your PySpark code, just like you did before:
df.withColumn("result", myfunc_udf("input_text_column", "param2_column"))
Remember to replace "input_text_column"
and "param2_column"
with the actual column names that contain the input text and the second parameter, respectively.
02-07-2024 05:05 AM
Hi Kaniz,
I started getting following error after using myfunc_udf with 2 parameters.
pythonException: 'ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()
Regards,
Sanjay
02-14-2024 06:42 AM
02-07-2024 01:40 AM
I need to send two arguments to myfunc, thus I have another brief question. I need some guidance on how to pass in many parameters.
02-07-2024 06:12 AM
Hello Sanjay,
Could you please share your code snippet as per latest changes?
02-07-2024 07:26 AM - edited 02-07-2024 07:39 AM
pythonException: 'ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()
02-07-2024 09:25 PM
Hello Sanjay,
The above code don't have the df defined. Can you share your df.show() output.
03-18-2024 07:19 PM
So good
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group