Hi guys,
We're facing a weird behavior or we're missing some configuration in our code. I've tried to find some information unsuccessfully. Let me try to explain our case, we have implemented a custom Evaluator in python using PySpark API, something like the following code snippet:
from pyspark.ml.evaluation import BinaryClassificationEvaluator,Evaluator
from pyspark.ml.util import DefaultParamsReadable,DefaultParamsWritable
import time
class BinaryEvaluator (BinaryClassificationEvaluator) :
pass
class DummyEvaluator (Evaluator,DefaultParamsWritable,DefaultParamsReadable) :
def __init__(self):
self.uid=1
self._paramMap = {}
self._defaultParamMap = {}
pass
def _evaluate(self,dataset):
return 0.9
def isLargetBetter(self):
return True
The problem arises at the moment when the DummyEvaluator is serialized and stored in the StorageAccount. To do that, the following code is used:
DummyEvaluator().save("abfss://mycontainer@mystacc.dfs.coore.windows.net/mymodel")
And the following error occurs:
Failure to initialize configuration for storage account mystacc.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
File <command-2244732439603884>, line 1
----> 1 DummyEvaluator().save(f'{base_path}/dummy_{int(time.time())}')
File /databricks/spark/python/pyspark/ml/util.py:355, in MLWritable.save(self, path)
353 def save(self, path: str) -> None:
354 """Save this ML instance to the given path, a shortcut of 'write().save(path)'."""
--> 355 self.write().save(path)
File /databricks/spark/python/pyspark/ml/util.py:249, in MLWriter.save(self, path)
247 if self.shouldOverwrite:
248 self._handleOverwrite(path)
--> 249 self.saveImpl(path)
File /databricks/spark/python/pyspark/ml/util.py:519, in DefaultParamsWriter.saveImpl(self, path)
518 def saveImpl(self, path: str) -> None:
--> 519 DefaultParamsWriter.saveMetadata(self.instance, path, self.sc)
File /databricks/spark/python/pyspark/ml/util.py:559, in DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata, paramMap)
555 metadataPath = os.path.join(path, "metadata")
556 metadataJson = DefaultParamsWriter._get_metadata_to_save(
557 instance, sc, extraMetadata, paramMap
558 )
--> 559 sc.parallelize([metadataJson], 1).saveAsTextFile(metadataPath)
File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature
51 )
52 return res
File /databricks/spark/python/pyspark/rdd.py:3457, in RDD.saveAsTextFile(self, path, compressionCodecClass)
3455 self.ctx._jvm.PythonRDD.saveAsTextFileImpl(keyed._jrdd, path, compressionCodecClass)
3456 else:
-> 3457 self.ctx._jvm.PythonRDD.saveAsTextFileImpl(keyed._jrdd, path)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
1349 command = proto.CALL_COMMAND_NAME +\
1350 self.command_header +\
1351 args_command +\
1352 proto.END_COMMAND_PART
1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
1356 answer, self.gateway_client, self.target_id, self.name)
1358 for temp_arg in temp_args:
1359 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
186 def deco(*a: Any, **kw: Any) -> Any:
187 try:
--> 188 return f(*a, **kw)
189 except Py4JJavaError as e:
190 converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsTextFileImpl.
: Failure to initialize configuration for storage account mystacc.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:52)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:682)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:2061)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:266)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:230)
at com.databricks.common.filesystem.LokiABFS.initialize(LokiABFS.scala:36)
at com.databricks.common.filesystem.LokiFileSystem$.$anonfun$getLokiFS$1(LokiFileSystem.scala:148)
at com.databricks.common.filesystem.Cache.getOrCompute(Cache.scala:38)
at com.databricks.common.filesystem.LokiFileSystem$.getLokiFS(LokiFileSystem.scala:145)
at com.databricks.common.filesystem.LokiFileSystem.$anonfun$initialize$1(LokiFileSystem.scala:191)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.common.filesystem.LokiFileSystem.initialize(LokiFileSystem.scala:183)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:537)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at com.databricks.unity.SAM.createDelegate(SAM.scala:183)
at com.databricks.unity.SAM.createDelegate$(SAM.scala:175)
at com.databricks.unity.ClusterDefaultSAM$.createDelegate(SAM.scala:193)
at com.databricks.sql.acl.fs.CredentialScopeFileSystem.createDelegate(CredentialScopeFileSystem.scala:83)
at com.databricks.sql.acl.fs.CredentialScopeFileSystem.$anonfun$setDelegates$1(CredentialScopeFileSystem.scala:136)
at com.databricks.sql.acl.fs.Lazy.apply(DelegatingFileSystem.scala:310)
at com.databricks.sql.acl.fs.CredentialScopeFileSystem.getWorkingDirectory(CredentialScopeFileSystem.scala:259)
at org.apache.spark.internal.io.SparkHadoopWriterUtils$.createPathFromString(SparkHadoopWriterUtils.scala:90)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1680)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1680)
at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1666)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1666)
at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:573)
at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:572)
at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
at org.apache.spark.api.python.PythonRDD$._saveAsTextFile(PythonRDD.scala:931)
at org.apache.spark.api.python.PythonRDD$.saveAsTextFileImpl(PythonRDD.scala:899)
at org.apache.spark.api.python.PythonRDD.saveAsTextFileImpl(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
Caused by: Invalid configuration value detected for fs.azure.account.key
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.diagnostics.ConfigurationBasicValidator.validate(ConfigurationBasicValidator.java:49)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator.validate(Base64StringConfigurationBasicValidator.java:40)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.validateStorageAccountKey(SimpleKeyProvider.java:71)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:49)
... 76 more
But the BinaryEvaluator is serialized without any problems:
BinaryEvaluator().save("abfss://mycontainer@mystacc.dfs.coore.windows.net/mymodel")
The cluster has the configuration "Enable credential passthrough for user-level data access" set to True. The Storage Account container authentication method is Microsoft Entra User account.
How should the DummyEvaluator be implemented in order to be serialized like the BinaryEvaluator? Are we missing some trait that must be implemented too?
Thanks in advance for your help.
Regads,