09-11-2022 07:49 AM
Context: I am using pyspark.pandas in a Databricks jupyter notebook and doing some text manipulation within the dataframe..
pyspark.pandas is the Pandas API on Spark and can be used exactly the same as usual Pandas
Error: PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
Some clues that can help you understand the error:
I do not get any error if I run my script on:
I get an error if I run my script on :
This makes me think the error is not code specific rather databricks/pyspark.pandas might have an intricacy / limitation / bug which happens with higher number of rows of data(3000 in my case)
Can somebody please explain why I am getting this error and how to resolve it?
Would appreciate if we stick to pyspark.pandas and not go into alternatives that suggest using spark sql..
If you want to look at the full stack trace for the error , you may check the code snippet you see in this question which I have asked.
09-13-2022 02:52 AM
pyspark pandas can indeed be used instead of classic pandas. However that being said, there are some caveats. If you just copy/paste your python code, chances are real you are not using the parallel processing capabilities of spark (distributed computing). For example iterating over values of a structure. this will still execute on a single node.
So to be able to scale out your code, you should use pyspark (can be pandas) and think in vectors/sets instead of records.
Instead of iterating over records, try to apply a function in one go. that is why spark was invented.
09-13-2022 11:24 AM
Hi @werners , thanks for your response.
As a beginner , I would like to use pyspark.pandas as a plug and play. (by converting my classic pandas code to pyspark.pandas ).
Would you know why I am getting the error (mentioned in the question)?
It's really peculiar that it happens only with larger datasets..
Do you recommend I raise an issue with Databricks ?
10-10-2022 01:06 AM
Hi @Krishna Zanwar, You can raise a support ticket with Databricks here.
01-14-2023 09:06 PM
@Krishna Zanwar , i'm receiving the same error.
For me, the behavior is when trying to broadcast a random forest (sklearn 1.2.0) recently loaded from mlflow, and using Pandas UDF to predict a model.
However, the same code works perfectly on Spark 2.4 + our OnPrem cluster.
I thought it was due to Spark 2.4 to 3 changes, and probably some breaking changes related to Pandas UDF API, however i've changed to the newer template and same behavior still happens.
schema = input_df.drop("ID").schema
schema.add(StructField("predict", IntegerType()), False)
schema.add(StructField("predict_proba", FloatType()), False)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def udf_predict(df: pd.DataFrame) -> pd.DataFrame:
df = df[list(input_df.drop("ID").columns)]
y_pred = broadcasted_model.value.predict(df)
if hasattr(broadcasted_model.value, "predict_proba"):
prediction_scores = broadcasted_model.value.predict_proba(df)
df["predict_proba"] = prediction_scores[:, -1]
df["predict"] = y_pred
return df
The code above is fairly simple, without any secrets and I'm having the same error as Krishz
in predict_with_regression
output_df = partitioned_df.groupby("scoring_partition").apply(udf_predict)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 86, in apply
return self.applyInPandas(udf.func, schema=udf.returnType)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/pandas/group_ops.py", line 201, in applyInPandas
udf_column = udf(*[df[col] for col in df.columns])
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 199, in wrapper
return self(*args)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 177, in __call__
judf = self._judf
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 161, in _judf
self._judf_placeholder = self._create_judf()
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 170, in _create_judf
wrapped_func = _wrap_function(sc, self.func, self.returnType)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/sql/udf.py", line 34, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/rdd.py", line 2850, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/opt/conda/envs/kedro-new-runtime/lib/python3.8/site-packages/pyspark/serializers.py", line 483, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.