cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Path of artifacts not found error in pyfunc.load_model using pyfunc wrapper

Octavian1
Contributor

Hi,

For a PySpark model, which involves also a pipeline, and that I want to register with mlflow, I am using a pyfunc wrapper.

Steps I followed:

1. Pipeline and model serialization and logging (using Volume locally, the logging will be performed in dbfs)

 import mlflow # pipeline serialization and logging pipeline_path = '/Volumes/my_catalog/my_db/my_volume/my_pipeline' my_pipeline.write().overwrite().save(pipeline_path) with mlflow.start_run(): mlflow.log_artifact(pipeline_path, 'pipeline') # pyspark model serialization and logging model_path = '/Volumes/my_catalog/my_db/my_volume/my_model' my_model.write().overwrite().save(model_path) with mlflow.start_run(): mlflow.log_artifact(model_path, 'model')2. Wrapper definition, using the serialized pipeline and model through the contextimport mlflow.pyfunc from pyspark.ml import PipelineModel from pyspark.sql import SparkSession class ModelWrapper(mlflow.pyfunc.PythonModel): def load_context(self, context): self.spark = SparkSession.builder.getOrCreate() self.pipeline = PipelineModel.load(context.artifacts["my_pipeline"]) self.model = PipelineModel.load(context.artifacts["my_model"]) def predict(self, context, input): input_df = self.pipeline.transform(input) trans = self.model.transform(input_df) return trans.collect()[0]['prediction']3. log pyfunc wrapper with mlflowimport cloudpickle from mlflow.models import infer_signature mlflow.set_registry_uri("databricks-uc") catalog = 'my_catalog' db = 'my_db' model_name = f"{catalog}.{db}.my_spark_model" with mlflow.start_run(run_name="my_run") as run: input_ex = input output_ex = 5.0 signature = infer_signature(input_ex, output_ex) artifacts = { "my_pipeline": "dbfs:/databricks/mlflow-tracking/<run_id_1>/2b398/artifacts/my_pipeline/pipeline", "my_model": "dbfs:/databricks/mlflow-tracking/<run_id_2>/9a4f51/artifacts/my_model/model" } model_info = mlflow.pyfunc.log_model( python_model = ModelWrapper(), artifacts = artifacts, artifact_path="my_spark_model", registered_model_name=model_name, pip_requirements=[ "mlflow==" + mlflow.__version__, "cloudpickle==" + cloudpickle.__version__, "pyspark==" + pyspark.__version__ ], input_example=input_ex, signature=signature )All of the 3 steps above ran OK.

After the last one, `pipeline` was correctly logged at 

dbfs:/databricks/mlflow-tracking/<run_id_3>/123a45/artifacts/my_spark_model/artifacts/pipeline

(including metadata, etc.)

whereas `model` at 

dbfs:/databricks/mlflow-tracking/<run_id_3>/123a45/artifacts/my_spark_model/artifacts/model

(including metadata, etc.)

The issue happens when trying to load the pyfunc wrapper:import mlflow from mlflow import MlflowClient latest_model = client.get_model_version_by_alias(model_name, "prod") logged_model = f'runs:/{latest_model.run_id}/my_spark_model' loaded_model = mlflow.pyfunc.load_model(logged_model)with the error:

Py4JJavaError: An error occurred while calling o11140.partitions. : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: /local_disk0/repl_tmp_data/ReplId-46fea-18625-1d618-7/tmpkxct3u4m/my_spark_model/artifacts/model/metadata

Is the problem related to the fact that the workers do not have access to the path of the artifact or the artifact path info has not been broadcast to them? Is it something different?

Does anybody has a clue how to solve it?

Basically, being able to use serialized Spark model artifacts in the pyfunc wrapper registered with mlflow on Databricks, using unity catalog.

Thanks!

 

 

 

 

2 REPLIES 2

Octavian1
Contributor

I think there is an issue with the DB community editor formatting, specifically when using Python code blocks.

pikapika
New Contributor II

Stuck with the same issue however I managed to load it ( was looking to serve it using model serving as well ),
One thing I noticed is that we can use mlflow.create_experiment() in the beginning and specify the default artifact location parameter as DBFS path.(https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.create_experiment)

Atleast then the load_model looks in the DBFS path. However it still looks at the incorrect path. One hack to resolve this is to copy the already saved pipeline model to the path the mlflow routine is looking at using dbutils.fs.cp() XDD.

On another note, if you're trying to serve this registered model, which I am, I don't think there is any support for Model serving for Pyspark ml model



Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group