Hi,
We are facing an mlflow.exceptions.MlflowException when mlflow is called from stream,
when we load the model outside the stream, its loaded fine, while when we load it from within stream it fails with exception. to emphasize that it was working till lately.
compute: 14.3.x-cpu-ml-scala2.12
def load_aspect_model():
mflow_client = MlflowClient()
return (mlflow.sklearn.load_model(f"runs:/xxx/yyy"))
1) works fine:
loaded_model=load_aspect_model()
2) exception
def batch_process(microDF,batchId๐
loaded_model=load_aspect_model()
reviews_quotes_stream = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("s3a://bucket-xxx/xxx")
reviews_quotes_stream = reviews_quotes_stream.writeStream \
.format("delta") \
.option("checkpointLocation", f"/tmp/garbage") \
.foreachBatch(batch_process) \
.start()
File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/pyspark/sql/utils.py", line 122, in call raise e File "/databricks/spark/python/pyspark/sql/utils.py", line 119, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/root/.ipykernel/11277/command-4409721816615086-1353001946", line 2, in batch_process loaded_model=load_aspect_model() File "/root/.ipykernel/11277/command-4409721816614710-1735106079", line 12, in load_aspect_model return (mlflow.sklearn.load_model(f"runs:/xxx/yyy")) File "/databricks/python/lib/python3.10/site-packages/mlflow/sklearn/__init__.py", line 613, in load_model local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path) File "/databricks/python/lib/python3.10/site-packages/mlflow/tracking/artifact_utils.py", line 100, in _download_artifact_from_uri return get_artifact_repository(artifact_uri=root_uri).download_artifacts( File "/databricks/python/lib/python3.10/site-packages/mlflow/store/artifact/runs_artifact_repo.py", line 125, in download_artifacts return self.repo.download_artifacts(artifact_path, dst_path) File "/databricks/python/lib/python3.10/site-packages/mlflow/store/artifact/artifact_repo.py", line 222, in download_artifacts raise MlflowException( mlflow.exceptions.MlflowException: The following failures occurred while downloading one or more artifacts from dbfs:/databricks/mlflow-tracking/56775009587862/c339b9cea76e44dd94043b91cc4fb42f/artifacts: ##### File category_aspects_model_1185_en_us/model.pkl #####
##### File category_aspects_model_1185_en_us/requirements.txt ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/conda.yaml ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/MLmodel ##### Invalid metric 'refreshableTokenNotFound' ##### File category_aspects_model_1185_en_us/python_env.yaml ##### Invalid metric 'refreshableTokenNotFound' at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at com.sun.proxy.$Proxy165.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:359) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:359) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:136) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:163) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:136) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:530) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:88) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1406) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.markAndTimeCollectBatch(MicroBatchExecution.scala:1207) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:1406) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:340) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:612) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:238) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1173) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:159) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:1399) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1399) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$5(MicroBatchExecution.scala:733) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.handleDataSourceException(MicroBatchExecution.scala:1766) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$4(MicroBatchExecution.scala:733) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1734) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$3(MicroBatchExecution.scala:729) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:204) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:689) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:25) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455) at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:25) at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:70) at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:170) at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612) at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489) at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:83) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:683) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1$adapted(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:83) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:71) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:128) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:141) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:667) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:451) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:445) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1173) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:396) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:25) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455) at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:25) at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:70) at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:170) at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603) at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612) at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491) at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489) at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:83) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:378) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:281) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:281) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:280)
Any idea?
thanks