I am using Databricks FeatureEngineeringClient to log my spark.ml model for batch inference. I use the ALS model on the movielens dataset. My dataset has three columns: user_id, item_id and rank
here is my code to prepare the dataset:
fe_data = fe.create_training_set(df=df_ratings, feature_lookups=model_feature_lookups, label=label, exclude_columns=["rating_date_month","rating_date_dayofmonth","timestamp"])
df_data = fe_data.load_df()
df_data = df_data.na.drop()
(df_train, df_test) = df_data.randomSplit([0.75,0.25],SEED)
note: both df_ratings and my feature table have the item_id and user_id.
then I train and log my model as follow:
from pyspark.ml.recommendation import ALS
import mlflow
from mlflow.models.signature import infer_signature
from mlflow.tracking.client import MlflowClient
from databricks.feature_engineering import FeatureEngineeringClient
mlflow.set_registry_uri("databricks-uc")
fe = FeatureEngineeringClient()
best_params ={}
best_params["REG_PARAM"] = 0.01
best_params["RANK"] = 2
with mlflow.start_run(run_name="ALS_final_model") as run:
fe_full_data, df_full_data, df_train, df_test = split_data()
als = ALS()
als.setMaxIter(MAX_ITER)\
.setSeed(SEED)\
.setRegParam(best_params["REG_PARAM"])\
.setUserCol(COL_USER)\
.setItemCol(COL_ITEM)\
.setRatingCol(COL_LABEL)\
.setRank(best_params["RANK"])
mlflow.log_param("MAX_ITER", MAX_ITER)
mlflow.log_param("RANK", best_params["RANK"])
mlflow.log_param("REG_PARAM", best_params["REG_PARAM"])
model = als.fit(df_train)
model.setColdStartStrategy('drop')
predictions = model.transform(df_train)
model_info = fe.log_model(model=model,
artifact_path = model_name,
flavor=mlflow.spark,
training_set=fe_full_data,
conda_env=mlflow.spark.get_default_conda_env(),
registered_model_name= f"{catalog_name}.{model_schema}.{model_name}_fs"
)
evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
rmse = evaluator.setMetricName("rmse").evaluate(predictions)
mlflow.log_metric('rmse', rmse)
after that I register my model to the UC. For the batch inference I use the following code:
model_uri = f"{catalog_name}.{model_schema}.{model_name}_fs"
model_version_uri = f"models:/{model_uri}@champion"
predictions_df = fe.score_batch(model_uri=f"{model_version_uri}",df = df_train)
here I get the following warning:
2024/08/14 12:09:18 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.
2024/08/14 12:09:18 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'
first question: it seems the mlflow is loading the file as `pyfunc` flawor although I registered as spark. why is that?
when I try to run display(prediction_df) I get the following error (truncated log):
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
return self.loads(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
return cloudpickle.loads(obj, encoding=encoding)
OSError: [Errno 5] Input/output error: '/path/to/your/notebooks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 1964, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/databricks/spark/python/pyspark/worker.py", line 1631, in read_udfs
arg_offsets, udf = read_single_udf(
File "/databricks/spark/python/pyspark/worker_util.py", line 70, in read_command
command = serializer._read_with_length(file)
pyspark.serializers.SerializationError: Caused by OSError: [Errno 5] Input/output error: '/path/to/your/notebooks'
File "<command-xxxxxx>", line 1, in <module>
predictions_df
it seems I am facing an issue when running a batch scoring job using PySpark on Databricks. The job throws an OSError related to an I/O failure while deserializing objects. The error occurs when trying to load objects with cloudpickle and leads to a SerializationError.
here is my how model is registered in the mlflow tracking