cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

FeatureEngineeringClient failing to run inference with mlflow.spark flavor

MohsenJ
Contributor

I am using Databricks FeatureEngineeringClient to log my spark.ml model for batch inference. I use the ALS model on the movielens dataset. My dataset has three columns: user_id, item_id and rank

here is my code to prepare the dataset:

fe_data = fe.create_training_set(df=df_ratings, feature_lookups=model_feature_lookups, label=label, exclude_columns=["rating_date_month","rating_date_dayofmonth","timestamp"])
df_data = fe_data.load_df()
df_data = df_data.na.drop()
(df_train, df_test) = df_data.randomSplit([0.75,0.25],SEED)


note: both df_ratings and my feature table have the item_id and user_id.

then I train and log my model as follow:

 

from pyspark.ml.recommendation import ALS

import mlflow
from mlflow.models.signature import infer_signature
from mlflow.tracking.client import MlflowClient
from databricks.feature_engineering import FeatureEngineeringClient

mlflow.set_registry_uri("databricks-uc")
fe = FeatureEngineeringClient()

best_params ={}
best_params["REG_PARAM"] = 0.01
best_params["RANK"] = 2

with mlflow.start_run(run_name="ALS_final_model") as run:
fe_full_data, df_full_data, df_train, df_test = split_data()
als = ALS()
als.setMaxIter(MAX_ITER)\
.setSeed(SEED)\
.setRegParam(best_params["REG_PARAM"])\
.setUserCol(COL_USER)\
.setItemCol(COL_ITEM)\
.setRatingCol(COL_LABEL)\
.setRank(best_params["RANK"])

mlflow.log_param("MAX_ITER", MAX_ITER)
mlflow.log_param("RANK", best_params["RANK"])
mlflow.log_param("REG_PARAM", best_params["REG_PARAM"])

model = als.fit(df_train)
model.setColdStartStrategy('drop')
predictions = model.transform(df_train)

model_info = fe.log_model(model=model,
artifact_path = model_name,
flavor=mlflow.spark,
training_set=fe_full_data,
conda_env=mlflow.spark.get_default_conda_env(),
registered_model_name= f"{catalog_name}.{model_schema}.{model_name}_fs"
)

evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
rmse = evaluator.setMetricName("rmse").evaluate(predictions)
mlflow.log_metric('rmse', rmse)




after that I register my model to the UC. For the batch inference I use the following code:

model_uri = f"{catalog_name}.{model_schema}.{model_name}_fs"
model_version_uri = f"models:/{model_uri}@champion"
predictions_df = fe.score_batch(model_uri=f"{model_version_uri}",df = df_train)


here I get the following warning:

2024/08/14 12:09:18 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.

2024/08/14 12:09:18 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


first question: it seems the mlflow is loading the file as `pyfunc` flawor although I registered as spark. why is that?

when I try to run display(prediction_df) I get the following error (truncated log):

Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
return self.loads(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
return cloudpickle.loads(obj, encoding=encoding)
OSError: [Errno 5] Input/output error: '/path/to/your/notebooks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 1964, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/databricks/spark/python/pyspark/worker.py", line 1631, in read_udfs
arg_offsets, udf = read_single_udf(
File "/databricks/spark/python/pyspark/worker_util.py", line 70, in read_command
command = serializer._read_with_length(file)
pyspark.serializers.SerializationError: Caused by OSError: [Errno 5] Input/output error: '/path/to/your/notebooks'

File "<command-xxxxxx>", line 1, in <module>
predictions_df

it seems I am facing an issue when running a batch scoring job using PySpark on Databricks. The job throws an OSError related to an I/O failure while deserializing objects. The error occurs when trying to load objects with cloudpickle and leads to a SerializationError.

here is my how model is registered in the mlflow tracking 

MohsenJ_0-1723641930280.png

 

2 REPLIES 2

KumaranT
New Contributor III

Hi @MohsenJ ,

Can you try to use mlflow.pyfunc flavor?

MohsenJ
Contributor

@KumaranT I did it already with the same result

 

import mlflow.pyfunc

# Load the model as a PyFuncModel
model = mlflow.pyfunc.load_model(model_uri=f"{model_version_uri}")

# Create a Spark UDF for scoring
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri=f"{model_version_uri}")

#Apply the UDF to the DataFrame
predictions_df = df_train.withColumn("predictions", predict_udf(*df_train.columns))

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group