cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Machine Learning
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

mlflow.exceptions.MlflowException - Invalid metric 'refreshableTokenNotFound'

amitca71
Contributor II
Hi,
We are facing an mlflow.exceptions.MlflowException when mlflow is called from stream,
when we load the model outside the stream, its loaded fine, while when we load it from within stream it fails with exception. to emphasize that it was working till lately.
compute: 14.3.x-cpu-ml-scala2.12
def load_aspect_model():
    mflow_client = MlflowClient()
    return (mlflow.sklearn.load_model(f"runs:/xxx/yyy"))
1)     works fine:
loaded_model=load_aspect_model()
2) exception
def batch_process(microDF,batchId๐Ÿ˜ž
  loaded_model=load_aspect_model()
reviews_quotes_stream = spark.readStream.format("delta") \
                        .option("ignoreChanges", "true") \
                        .load("s3a://bucket-xxx/xxx")
reviews_quotes_stream = reviews_quotes_stream.writeStream \
  .format("delta") \
  .option("checkpointLocation", f"/tmp/garbage") \
  .foreachBatch(batch_process) \
  .start()
File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/pyspark/sql/utils.py", line 122, in call raise e File "/databricks/spark/python/pyspark/sql/utils.py", line 119, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/root/.ipykernel/11277/command-4409721816615086-1353001946", line 2, in batch_process loaded_model=load_aspect_model() File "/root/.ipykernel/11277/command-4409721816614710-1735106079", line 12, in load_aspect_model return (mlflow.sklearn.load_model(f"runs:/xxx/yyy")) File "/databricks/python/lib/python3.10/site-packages/mlflow/sklearn/__init__.py", line 613, in load_model local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path) File "/databricks/python/lib/python3.10/site-packages/mlflow/tracking/artifact_utils.py", line 100, in _download_artifact_from_uri return get_artifact_repository(artifact_uri=root_uri).download_artifacts( File "/databricks/python/lib/python3.10/site-packages/mlflow/store/artifact/runs_artifact_repo.py", line 125, in download_artifacts return self.repo.download_artifacts(artifact_path, dst_path) File "/databricks/python/lib/python3.10/site-packages/mlflow/store/artifact/artifact_repo.py", line 222, in download_artifacts raise MlflowException( mlflow.exceptions.MlflowException: The following failures occurred while downloading one or more artifacts from dbfs:/databricks/mlflow-tracking/56775009587862/c339b9cea76e44dd94043b91cc4fb42f/artifacts: ##### File category_aspects_model_1185_en_us/model.pkl #####
##### File category_aspects_model_1185_en_us/requirements.txt ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/conda.yaml ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/MLmodel ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/python_env.yaml ##### Invalid metric 'refreshableTokenNotFound' at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at com.sun.proxy.$Proxy165.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:359) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:359) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:136) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:163) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:136) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:530) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:88) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1406) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.markAndTimeCollectBatch(MicroBatchExecution.scala:1207) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:1406) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:340) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:612) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:238) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1173) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:159) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:1399) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1399) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$5(MicroBatchExecution.scala:733) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.handleDataSourceException(MicroBatchExecution.scala:1766) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$4(MicroBatchExecution.scala:733) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1734) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$3(MicroBatchExecution.scala:729) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:689) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:25) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455) at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:25) at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:70) at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:170) at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612) at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489) at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:83) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:683) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1$adapted(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:83) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:71) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:128) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:141) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:451) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:445) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1173) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:396) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:25) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455) at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:25) at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:70) at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:170) at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612) at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489) at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:83) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:378) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:281) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:281) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:280)
 
Any idea?
thanks
2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @amitca71, It looks like you're having trouble with an `MlflowException` when trying to load a model from within a stream.

Here's a simplified explanation of the issue and some steps to troubleshoot it:

**Issue**: When you load the model inside the stream, you get an error saying you can't change parameter values, specifically related to `'input_rows'`.

**Solution**:

1. **Check Environment**: Make sure the environment inside and outside the stream is the same, including any variables or configurations.

2. **Use Mlflow Run Context**: Inside the stream, start an Mlflow run context using `with mlflow.start_run() as mlrun:` to ensure the correct run context.

3. **Log Parameters**: Inside the stream, log relevant parameters using `mlflow.log_param('input_rows', train_tweets.shape[0])`.

4. **Verify Artifact Path**: Confirm that the artifact path for the model (`f"runs:/xxx/yyy"`) is correct and accessible from within the stream.

5. **Handle Exceptions Gracefully**: Use a try-except block to handle exceptions during model loading and set a tag to indicate the exception.

If you need more help, please provide more details!

amitca71
Contributor II

Downgrade to version 13.3 did the trick

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.