Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
Showing results for 
Search instead for 
Did you mean: 

Error on pandas udf usage in databricks, sc.broadcasting random forest loaded from Kedro MLFlow Logger DataSet, cannot pickle '_thread.RLock' object

New Contributor III

I'm trying to broadcast a Random forest (sklearn 1.2.0) recently loaded from mlflow, and using Pandas UDF to predict a model.

However, the same code works perfectly on Spark 2.4 + our OnPrem cluster.

I thought it was due to Spark 2.4 to 3 changes, and probably some breaking changes related to Pandas UDF API, however i've changed to the newer template and same behavior still happens.

schema = input_df.drop("ID").schema
schema.add(StructField("predict", IntegerType()), False)
schema.add(StructField("predict_proba", FloatType()), False)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def udf_predict(df: pd.DataFrame) -> pd.DataFrame:
    df = df[list(input_df.drop("ID").columns)]
    y_pred = broadcasted_model.value.predict(df)
    if hasattr(broadcasted_model.value, "predict_proba"):
        prediction_scores = broadcasted_model.value.predict_proba(df)
        df["predict_proba"] = prediction_scores[:, -1]
    df["predict"] = y_pred
    return df

The code above is fairly simple and it should be working properly as it does in older versions (works perfectly on spark 2.4, and even with the new pandas udf options, not a single change in pyspark 3)

Its giving me the following error message:

in predict_with_regression
    output_df = partitioned_df.groupby("scoring_partition").apply(udf_predict)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/", line 86, in apply
    return self.applyInPandas(udf.func, schema=udf.returnType)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/", line 201, in applyInPandas
    udf_column = udf(*[df[col] for col in df.columns])
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/", line 199, in wrapper
    return self(*args)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/", line 177, in __call__
    judf = self._judf
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/", line 161, in _judf
    self._judf_placeholder = self._create_judf()
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/", line 170, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/", line 34, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/", line 2850, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/", line 483, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

I honestly have no clue, even tried using threading release functionalities, but its saying to me that both random forest object, as well as the broadcasted one, none of them are locked, and therefore the release option can't be done.

Any ideas how to fix this RLock situation?

No custom code have been added, just pyspark 3.2 and scikit learn 1.2.0, I mean, the model isn't wrapped in any custom class.


New Contributor III


Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!