I'm trying to broadcast a Random forest (sklearn 1.2.0) recently loaded from mlflow, and using Pandas UDF to predict a model.
However, the same code works perfectly on Spark 2.4 + our OnPrem cluster.
I thought it was due to Spark 2.4 to 3 changes, and probably some breaking changes related to Pandas UDF API, however i've changed to the newer template and same behavior still happens.
schema = input_df.drop("ID").schema
schema.add(StructField("predict", IntegerType()), False)
schema.add(StructField("predict_proba", FloatType()), False)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def udf_predict(df: pd.DataFrame) -> pd.DataFrame:
df = df[list(input_df.drop("ID").columns)]
y_pred = broadcasted_model.value.predict(df)
if hasattr(broadcasted_model.value, "predict_proba"):
prediction_scores = broadcasted_model.value.predict_proba(df)
df["predict_proba"] = prediction_scores[:, -1]
df["predict"] = y_pred
return df
The code above is fairly simple and it should be working properly as it does in older versions (works perfectly on spark 2.4, and even with the new pandas udf options, not a single change in pyspark 3)
Its giving me the following error message:
in predict_with_regression
output_df = partitioned_df.groupby("scoring_partition").apply(udf_predict)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 86, in apply
return self.applyInPandas(udf.func, schema=udf.returnType)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 201, in applyInPandas
udf_column = udf(*[df[col] for col in df.columns])
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 199, in wrapper
return self(*args)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 177, in __call__
judf = self._judf
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 161, in _judf
self._judf_placeholder = self._create_judf()
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 170, in _create_judf
wrapped_func = _wrap_function(sc, self.func, self.returnType)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 34, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/rdd.py", line 2850, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/serializers.py", line 483, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
I honestly have no clue, even tried using threading release functionalities, but its saying to me that both random forest object, as well as the broadcasted one, none of them are locked, and therefore the release option can't be done.
Any ideas how to fix this RLock situation?
No custom code have been added, just pyspark 3.2 and scikit learn 1.2.0, I mean, the model isn't wrapped in any custom class.