@Krishna Zanwar , i'm receiving the same error.
For me, the behavior is when trying to broadcast a random forest (sklearn 1.2.0) recently loaded from mlflow, and using Pandas UDF to predict a model.
However, the same code works perfectly on Spark 2.4 + our OnPrem cluster.
I thought it was due to Spark 2.4 to 3 changes, and probably some breaking changes related to Pandas UDF API, however i've changed to the newer template and same behavior still happens.
schema = input_df.drop("ID").schema
schema.add(StructField("predict", IntegerType()), False)
schema.add(StructField("predict_proba", FloatType()), False)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def udf_predict(df: pd.DataFrame) -> pd.DataFrame:
df = df[list(input_df.drop("ID").columns)]
y_pred = broadcasted_model.value.predict(df)
if hasattr(broadcasted_model.value, "predict_proba"):
prediction_scores = broadcasted_model.value.predict_proba(df)
df["predict_proba"] = prediction_scores[:, -1]
df["predict"] = y_pred
return df
The code above is fairly simple, without any secrets and I'm having the same error as Krishz
in predict_with_regression
output_df = partitioned_df.groupby("scoring_partition").apply(udf_predict)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 86, in apply
return self.applyInPandas(udf.func, schema=udf.returnType)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 201, in applyInPandas
udf_column = udf(*[df[col] for col in df.columns])
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 199, in wrapper
return self(*args)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 177, in __call__
judf = self._judf
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 161, in _judf
self._judf_placeholder = self._create_judf()
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 170, in _create_judf
wrapped_func = _wrap_function(sc, self.func, self.returnType)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 34, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/rdd.py", line 2850, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/serializers.py", line 483, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object