cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

[Pyspark.Pandas] PicklingError: Could not serialize object (this error is happening only for large datasets)

KrishZ
Contributor

Context: I am using pyspark.pandas in a Databricks jupyter notebook and doing some text manipulation within the dataframe..

pyspark.pandas is the Pandas API on Spark and can be used exactly the same as usual Pandas

Error: PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

Some clues that can help you understand the error:

I do not get any error if I run my script on:

  • 300 rows of data.
  • 600 rows of data (created by replicating the original 300 x2)

I get an error if I run my script on :

  • On 3000 rows of data (created by replicating the original 300 x10)
  • On 3000 unique rows (not related to the original 300 rows)

This makes me think the error is not code specific rather databricks/pyspark.pandas might have an intricacy / limitation / bug which happens with higher number of rows of data(3000 in my case)

Can somebody please explain why I am getting this error and how to resolve it?

Would appreciate if we stick to pyspark.pandas and not go into alternatives that suggest using spark sql..

If you want to look at the full stack trace for the error , you may check the code snippet you see in this question which I have asked.

4 REPLIES 4

-werners-
Esteemed Contributor III

pyspark pandas can indeed be used instead of classic pandas. However that being said, there are some caveats. If you just copy/paste your python code, chances are real you are not using the parallel processing capabilities of spark (distributed computing). For example iterating over values of a structure. this will still execute on a single node.

So to be able to scale out your code, you should use pyspark (can be pandas) and think in vectors/sets instead of records.

Instead of iterating over records, try to apply a function in one go. that is why spark was invented.

Hi @werners , thanks for your response.

As a beginner , I would like to use pyspark.pandas as a plug and play. (by converting my classic pandas code to pyspark.pandas ).

Would you know why I am getting the error (mentioned in the question)?

It's really peculiar that it happens only with larger datasets..

Do you recommend I raise an issue with Databricks ?

Kaniz
Community Manager
Community Manager

Hi @Krishna Zanwar​, You can raise a support ticket with Databricks here.

ryojikn
New Contributor III

@Krishna Zanwar​ , i'm receiving the same error.

For me, the behavior is when trying to broadcast a random forest (sklearn 1.2.0) recently loaded from mlflow, and using Pandas UDF to predict a model.

However, the same code works perfectly on Spark 2.4 + our OnPrem cluster.

I thought it was due to Spark 2.4 to 3 changes, and probably some breaking changes related to Pandas UDF API, however i've changed to the newer template and same behavior still happens.

schema = input_df.drop("ID").schema
schema.add(StructField("predict", IntegerType()), False)
schema.add(StructField("predict_proba", FloatType()), False)
    
    
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def udf_predict(df: pd.DataFrame) -> pd.DataFrame:
    df = df[list(input_df.drop("ID").columns)]
    y_pred = broadcasted_model.value.predict(df)
    if hasattr(broadcasted_model.value, "predict_proba"):
        prediction_scores = broadcasted_model.value.predict_proba(df)
        df["predict_proba"] = prediction_scores[:, -1]
    df["predict"] = y_pred
    return df

The code above is fairly simple, without any secrets and I'm having the same error as Krishz

in predict_with_regression
    output_df = partitioned_df.groupby("scoring_partition").apply(udf_predict)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 86, in apply
    return self.applyInPandas(udf.func, schema=udf.returnType)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 201, in applyInPandas
    udf_column = udf(*[df[col] for col in df.columns])
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 199, in wrapper
    return self(*args)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 177, in __call__
    judf = self._judf
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 161, in _judf
    self._judf_placeholder = self._create_judf()
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 170, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 34, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/rdd.py", line 2850, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/serializers.py", line 483, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!