cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

Error on pandas udf usage in databricks, sc.broadcasting random forest loaded from Kedro MLFlow Logger DataSet, cannot pickle '_thread.RLock' object

ryojikn
New Contributor III

I'm trying to broadcast a Random forest (sklearn 1.2.0) recently loaded from mlflow, and using Pandas UDF to predict a model.

However, the same code works perfectly on Spark 2.4 + our OnPrem cluster.

I thought it was due to Spark 2.4 to 3 changes, and probably some breaking changes related to Pandas UDF API, however i've changed to the newer template and same behavior still happens.

schema = input_df.drop("ID").schema
schema.add(StructField("predict", IntegerType()), False)
schema.add(StructField("predict_proba", FloatType()), False)
    
    
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def udf_predict(df: pd.DataFrame) -> pd.DataFrame:
    df = df[list(input_df.drop("ID").columns)]
    y_pred = broadcasted_model.value.predict(df)
    if hasattr(broadcasted_model.value, "predict_proba"):
        prediction_scores = broadcasted_model.value.predict_proba(df)
        df["predict_proba"] = prediction_scores[:, -1]
    df["predict"] = y_pred
    return df

The code above is fairly simple and it should be working properly as it does in older versions (works perfectly on spark 2.4, and even with the new pandas udf options, not a single change in pyspark 3)

Its giving me the following error message:

in predict_with_regression
    output_df = partitioned_df.groupby("scoring_partition").apply(udf_predict)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 86, in apply
    return self.applyInPandas(udf.func, schema=udf.returnType)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 201, in applyInPandas
    udf_column = udf(*[df[col] for col in df.columns])
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 199, in wrapper
    return self(*args)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 177, in __call__
    judf = self._judf
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 161, in _judf
    self._judf_placeholder = self._create_judf()
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 170, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 34, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/rdd.py", line 2850, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/serializers.py", line 483, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

I honestly have no clue, even tried using threading release functionalities, but its saying to me that both random forest object, as well as the broadcasted one, none of them are locked, and therefore the release option can't be done.

Any ideas how to fix this RLock situation?

No custom code have been added, just pyspark 3.2 and scikit learn 1.2.0, I mean, the model isn't wrapped in any custom class.

1 REPLY 1

ryojikn
New Contributor III

Anyone?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.