<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: FeatureEngineeringClient failing to run inference with mlflow.spark flavor in Machine Learning</title>
    <link>https://community.databricks.com/t5/machine-learning/featureengineeringclient-failing-to-run-inference-with-mlflow/m-p/83115#M3579</link>
    <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97748"&gt;@MohsenJ&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;Can you try to use mlflow.pyfunc flavor?&lt;/P&gt;</description>
    <pubDate>Thu, 15 Aug 2024 16:44:30 GMT</pubDate>
    <dc:creator>KumaranT</dc:creator>
    <dc:date>2024-08-15T16:44:30Z</dc:date>
    <item>
      <title>FeatureEngineeringClient failing to run inference with mlflow.spark flavor</title>
      <link>https://community.databricks.com/t5/machine-learning/featureengineeringclient-failing-to-run-inference-with-mlflow/m-p/82978#M3578</link>
      <description>&lt;P&gt;I am using Databricks FeatureEngineeringClient to log my spark.ml model for batch inference. I use the ALS model on the movielens dataset. My dataset has three columns: &lt;EM&gt;user_id&lt;/EM&gt;, &lt;EM&gt;item_id&lt;/EM&gt;&amp;nbsp;and &lt;EM&gt;rank&lt;/EM&gt;&lt;/P&gt;&lt;P&gt;here is my code to prepare the dataset:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;fe_data = fe.create_training_set(df=df_ratings, feature_lookups=model_feature_lookups, label=label, exclude_columns=["rating_date_month","rating_date_dayofmonth","timestamp"])
df_data = fe_data.load_df()
df_data = df_data.na.drop()
(df_train, df_test) = df_data.randomSplit([0.75,0.25],SEED)&lt;/LI-CODE&gt;&lt;P&gt;&lt;BR /&gt;note: both &lt;EM&gt;df_ratings&lt;/EM&gt; and &lt;EM&gt;my feature table&lt;/EM&gt; have the &lt;EM&gt;item_id&lt;/EM&gt; and&lt;EM&gt; user_id.&lt;/EM&gt;&lt;/P&gt;&lt;P&gt;then I train and log my model as follow:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.ml.recommendation import ALS

import mlflow
from mlflow.models.signature import infer_signature
from mlflow.tracking.client import MlflowClient
from databricks.feature_engineering import FeatureEngineeringClient

mlflow.set_registry_uri("databricks-uc")
fe = FeatureEngineeringClient()

best_params ={}
best_params["REG_PARAM"] = 0.01
best_params["RANK"] = 2

with mlflow.start_run(run_name="ALS_final_model") as run:
fe_full_data, df_full_data, df_train, df_test = split_data()
als = ALS()
als.setMaxIter(MAX_ITER)\
.setSeed(SEED)\
.setRegParam(best_params["REG_PARAM"])\
.setUserCol(COL_USER)\
.setItemCol(COL_ITEM)\
.setRatingCol(COL_LABEL)\
.setRank(best_params["RANK"])

mlflow.log_param("MAX_ITER", MAX_ITER)
mlflow.log_param("RANK", best_params["RANK"])
mlflow.log_param("REG_PARAM", best_params["REG_PARAM"])

model = als.fit(df_train)
model.setColdStartStrategy('drop')
predictions = model.transform(df_train)

model_info = fe.log_model(model=model,
artifact_path = model_name,
flavor=mlflow.spark,
training_set=fe_full_data,
conda_env=mlflow.spark.get_default_conda_env(),
registered_model_name= f"{catalog_name}.{model_schema}.{model_name}_fs"
)

evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
rmse = evaluator.setMetricName("rmse").evaluate(predictions)
mlflow.log_metric('rmse', rmse)&lt;/LI-CODE&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;after that I register my model to the UC. For the batch inference I use the following code:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;model_uri = f"{catalog_name}.{model_schema}.{model_name}_fs"
model_version_uri = f"models:/{model_uri}@champion"
predictions_df = fe.score_batch(model_uri=f"{model_version_uri}",df = df_train)&lt;/LI-CODE&gt;&lt;P&gt;&lt;BR /&gt;here I get the following warning:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;2024/08/14 12:09:18 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.

2024/08/14 12:09:18 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'&lt;/LI-CODE&gt;&lt;P&gt;&lt;BR /&gt;&lt;STRONG&gt;first question&lt;/STRONG&gt;: it seems the mlflow is loading the file as `pyfunc` flawor although I registered as spark. why is that?&lt;/P&gt;&lt;P&gt;when I try to run &lt;EM&gt;display(prediction_df)&lt;/EM&gt; I get the following error (truncated log):&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
return self.loads(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
return cloudpickle.loads(obj, encoding=encoding)
OSError: [Errno 5] Input/output error: '/path/to/your/notebooks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 1964, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/databricks/spark/python/pyspark/worker.py", line 1631, in read_udfs
arg_offsets, udf = read_single_udf(
File "/databricks/spark/python/pyspark/worker_util.py", line 70, in read_command
command = serializer._read_with_length(file)
pyspark.serializers.SerializationError: Caused by OSError: [Errno 5] Input/output error: '/path/to/your/notebooks'

File "&amp;lt;command-xxxxxx&amp;gt;", line 1, in &amp;lt;module&amp;gt;
predictions_df&lt;/LI-CODE&gt;&lt;P&gt;it seems I am facing an issue when running a batch scoring job using PySpark on Databricks. The job throws an OSError related to an I/O failure while deserializing objects. The error occurs when trying to load objects with cloudpickle and leads to a SerializationError.&lt;/P&gt;&lt;P&gt;here is my how model is registered in the mlflow tracking&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="MohsenJ_0-1723641930280.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/10336iB91C9B7DB0990160/image-size/medium/is-moderation-mode/true?v=v2&amp;amp;px=400" role="button" title="MohsenJ_0-1723641930280.png" alt="MohsenJ_0-1723641930280.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 14 Aug 2024 13:25:50 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/featureengineeringclient-failing-to-run-inference-with-mlflow/m-p/82978#M3578</guid>
      <dc:creator>MohsenJ</dc:creator>
      <dc:date>2024-08-14T13:25:50Z</dc:date>
    </item>
    <item>
      <title>Re: FeatureEngineeringClient failing to run inference with mlflow.spark flavor</title>
      <link>https://community.databricks.com/t5/machine-learning/featureengineeringclient-failing-to-run-inference-with-mlflow/m-p/83115#M3579</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97748"&gt;@MohsenJ&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;Can you try to use mlflow.pyfunc flavor?&lt;/P&gt;</description>
      <pubDate>Thu, 15 Aug 2024 16:44:30 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/featureengineeringclient-failing-to-run-inference-with-mlflow/m-p/83115#M3579</guid>
      <dc:creator>KumaranT</dc:creator>
      <dc:date>2024-08-15T16:44:30Z</dc:date>
    </item>
    <item>
      <title>Re: FeatureEngineeringClient failing to run inference with mlflow.spark flavor</title>
      <link>https://community.databricks.com/t5/machine-learning/featureengineeringclient-failing-to-run-inference-with-mlflow/m-p/83178#M3583</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/115880"&gt;@KumaranT&lt;/a&gt;&amp;nbsp;I did it already with the same result&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import mlflow.pyfunc

# Load the model as a PyFuncModel
model = mlflow.pyfunc.load_model(model_uri=f"{model_version_uri}")

# Create a Spark UDF for scoring
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri=f"{model_version_uri}")

#Apply the UDF to the DataFrame
predictions_df = df_train.withColumn("predictions", predict_udf(*df_train.columns))&lt;/LI-CODE&gt;</description>
      <pubDate>Fri, 16 Aug 2024 06:45:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/featureengineeringclient-failing-to-run-inference-with-mlflow/m-p/83178#M3583</guid>
      <dc:creator>MohsenJ</dc:creator>
      <dc:date>2024-08-16T06:45:42Z</dc:date>
    </item>
  </channel>
</rss>

