cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

mlflow.exceptions.MlflowException - Invalid metric 'refreshableTokenNotFound'

amitca71
Contributor II
Hi,
We are facing an mlflow.exceptions.MlflowException when mlflow is called from stream,
when we load the model outside the stream, its loaded fine, while when we load it from within stream it fails with exception. to emphasize that it was working till lately.
compute: 14.3.x-cpu-ml-scala2.12
def load_aspect_model():
    mflow_client = MlflowClient()
    return (mlflow.sklearn.load_model(f"runs:/xxx/yyy"))
1)     works fine:
loaded_model=load_aspect_model()
2) exception
def batch_process(microDF,batchId😞
  loaded_model=load_aspect_model()
reviews_quotes_stream = spark.readStream.format("delta") \
                        .option("ignoreChanges", "true") \
                        .load("s3a://bucket-xxx/xxx")
reviews_quotes_stream = reviews_quotes_stream.writeStream \
  .format("delta") \
  .option("checkpointLocation", f"/tmp/garbage") \
  .foreachBatch(batch_process) \
  .start()
File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/pyspark/sql/utils.py", line 122, in call raise e File "/databricks/spark/python/pyspark/sql/utils.py", line 119, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/root/.ipykernel/11277/command-4409721816615086-1353001946", line 2, in batch_process loaded_model=load_aspect_model() File "/root/.ipykernel/11277/command-4409721816614710-1735106079", line 12, in load_aspect_model return (mlflow.sklearn.load_model(f"runs:/xxx/yyy")) File "/databricks/python/lib/python3.10/site-packages/mlflow/sklearn/__init__.py", line 613, in load_model local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path) File "/databricks/python/lib/python3.10/site-packages/mlflow/tracking/artifact_utils.py", line 100, in _download_artifact_from_uri return get_artifact_repository(artifact_uri=root_uri).download_artifacts( File "/databricks/python/lib/python3.10/site-packages/mlflow/store/artifact/runs_artifact_repo.py", line 125, in download_artifacts return self.repo.download_artifacts(artifact_path, dst_path) File "/databricks/python/lib/python3.10/site-packages/mlflow/store/artifact/artifact_repo.py", line 222, in download_artifacts raise MlflowException( mlflow.exceptions.MlflowException: The following failures occurred while downloading one or more artifacts from dbfs:/databricks/mlflow-tracking/56775009587862/c339b9cea76e44dd94043b91cc4fb42f/artifacts: ##### File category_aspects_model_1185_en_us/model.pkl #####
##### File category_aspects_model_1185_en_us/requirements.txt ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/conda.yaml ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/MLmodel ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/python_env.yaml ##### Invalid metric 'refreshableTokenNotFound' at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at com.sun.proxy.$Proxy165.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:359) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:359) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:136) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:163) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:136) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:530) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:88) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1406) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.markAndTimeCollectBatch(MicroBatchExecution.scala:1207) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:1406) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:340) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:612) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:238) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1173) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:159) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:1399) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1399) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$5(MicroBatchExecution.scala:733) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.handleDataSourceException(MicroBatchExecution.scala:1766) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$4(MicroBatchExecution.scala:733) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1734) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$3(MicroBatchExecution.scala:729) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:689) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:25) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455) at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:25) at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:70) at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:170) at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612) at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489) at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:83) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:683) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1$adapted(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:83) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:71) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:128) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:141) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:451) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:445) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1173) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:396) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:25) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455) at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:25) at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:70) at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:170) at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612) at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489) at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:83) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:378) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:281) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:281) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:280)
 
Any idea?
thanks
2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @amitca71, It looks like you're having trouble with an `MlflowException` when trying to load a model from within a stream.

Here's a simplified explanation of the issue and some steps to troubleshoot it:

**Issue**: When you load the model inside the stream, you get an error saying you can't change parameter values, specifically related to `'input_rows'`.

**Solution**:

1. **Check Environment**: Make sure the environment inside and outside the stream is the same, including any variables or configurations.

2. **Use Mlflow Run Context**: Inside the stream, start an Mlflow run context using `with mlflow.start_run() as mlrun:` to ensure the correct run context.

3. **Log Parameters**: Inside the stream, log relevant parameters using `mlflow.log_param('input_rows', train_tweets.shape[0])`.

4. **Verify Artifact Path**: Confirm that the artifact path for the model (`f"runs:/xxx/yyy"`) is correct and accessible from within the stream.

5. **Handle Exceptions Gracefully**: Use a try-except block to handle exceptions during model loading and set a tag to indicate the exception.

If you need more help, please provide more details!

amitca71
Contributor II

Downgrade to version 13.3 did the trick

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!