โ01-28-2024 11:24 AM
Hi,
I have pyspark dataframe and pyspark udf which calls mlflow model for each row but its performance is too slow.
Here is sample code
def myfunc(input_text):
restult = mlflowmodel.predict(input_text)
return result
myfuncUDF = udf(myfunc,StringType())
df = spark.sql("select * from test")
df=df.withColumn("test_result",myfuncUDF("input_text"))
Please suggest how to improve the performance.
Regards,
Sanjay
โ01-29-2024 12:13 AM - edited โ01-29-2024 12:16 AM
Hi @sanjay, One common problem when using PySpark UDFs is poor performance. This is due to the fact that PySpark UDFs cannot utilize the built-in optimizations of Spark. As a result, data is processed on a row-by-row basis, which is not an efficient approach.
Allow me to offer some recommendations for enhancing performance:
โ01-31-2024 04:28 AM
Thank you Kaniz for the suggestions. This is really helpful. I even tried using applyInPandas. Not sure if this is better than spark UDF. If not can you help me in converting this function to pandas udf or any other optimized function.
โ02-06-2024 01:18 AM
Hi @sanjay, Letโs convert your existing function to a Pandas UDF in PySpark. First, weโll create a Pandas UDF that performs the same operation as your myfunc. Then, weโll apply this UDF to your DataFrame.
Hereโs the modified code:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType# Define your function (equivalent to myfunc)
def myfunc(input_text):
# Assuming mlflowmodel is defined elsewhere
result = mlflowmodel.predict(input_text)
return result# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series) -> pd.Series:
return input_text_series.apply(myfunc)# Load your DataFrame
df = spark.sql("SELECT * FROM test")# Apply the UDF to the DataFrame
df = df.withColumn("test_result", myfunc_udf(df["input_text"]))# Group by "id" and apply the UDF
grouped_df = df.groupBy("id").applyInPandas(myfunc_udf)# Now you have the result in the "test_result" column of the grouped DataFrame
A few notes:
Feel free to adapt this code to your specific requirements! ๐
โ02-06-2024 11:36 PM
Thank you @Kaniz_Fatma, its really helpful and did worked. Another quick question, I have to pass 2 parameters as input to myfunc. Please help how to pass multiple parameters.
def myfunc(input_text, param2):
# Assuming mlflowmodel is defined elsewhere
result = mlflowmodel.predict(input_text, param2)
return result
# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series, param2_series: pd.Series) -> pd.Series:
return input_text_series.apply(myfunc) ??
โ02-07-2024 03:45 AM
Hi @sanjay, To pass multiple parameters to a pandas UDF in PySpark, you can define the UDF to accept multiple pandas Series as input and return a pandas Series as output. Here's how you can modify the myfunc_udf
definition to accept the second parameter param2_series
:
# Create a Pandas UDF
@pandas_udf(StringType())
def myfunc_udf(input_text_series: pd.Series, param2_series: pd.Series) -> pd.Series:
def apply_myfunc(input_text, param2):
return myfunc(input_text, param2)
return input_text_series.apply(apply_myfunc, param2_series=param2_series)
In this example, I've defined an inner function apply_myfunc
that takes both input_text
and param2
as parameters and calls myfunc
with these parameters. Then, the pandas Series input_text_series
is applied using the apply
function with apply_myfunc
as the function to apply and param2_series
as the keyword argument.
Now you can use myfunc_udf
in your PySpark code, just like you did before:
df.withColumn("result", myfunc_udf("input_text_column", "param2_column"))
Remember to replace "input_text_column"
and "param2_column"
with the actual column names that contain the input text and the second parameter, respectively.
โ02-07-2024 05:05 AM
Hi Kaniz,
I started getting following error after using myfunc_udf with 2 parameters.
pythonException: 'ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()
Regards,
Sanjay
โ02-14-2024 06:42 AM
โ02-07-2024 01:40 AM
I need to send two arguments to myfunc, thus I have another brief question. I need some guidance on how to pass in many parameters.
โ02-07-2024 06:12 AM
Hello Sanjay,
Could you please share your code snippet as per latest changes?
โ02-07-2024 07:26 AM - edited โ02-07-2024 07:39 AM
pythonException: 'ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()
โ02-07-2024 09:25 PM
Hello Sanjay,
The above code don't have the df defined. Can you share your df.show() output.
โ03-18-2024 07:19 PM
So good
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group