cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

Serializing custom SparkMLlib Evaluator

CharlesFlores
New Contributor II

Hi guys,

We're facing a weird behavior or we're missing some configuration in our code. I've tried to find some information unsuccessfully. Let me try to explain our case, we have implemented a custom Evaluator in python using PySpark API, something like the following code snippet:

 

from pyspark.ml.evaluation import BinaryClassificationEvaluator,Evaluator
from pyspark.ml.util import DefaultParamsReadable,DefaultParamsWritable
import time
 
class BinaryEvaluator (BinaryClassificationEvaluator) :
    pass
 
class DummyEvaluator (Evaluator,DefaultParamsWritable,DefaultParamsReadable) :
    def __init__(self):
        self.uid=1
        self._paramMap = {}
        self._defaultParamMap = {}
        pass
 
    def _evaluate(self,dataset):
        return 0.9
   
    def isLargetBetter(self):
        return True

 

The problem arises at the moment when the DummyEvaluator is serialized and stored in the StorageAccount. To do that, the following code is used:

 

DummyEvaluator().save("abfss://mycontainer@mystacc.dfs.coore.windows.net/mymodel")

 

And the following error occurs:

 

Failure to initialize configuration for storage account mystacc.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
File <command-2244732439603884>, line 1
----> 1 DummyEvaluator().save(f'{base_path}/dummy_{int(time.time())}')

File /databricks/spark/python/pyspark/ml/util.py:355, in MLWritable.save(self, path)
    353 def save(self, path: str) -> None:
    354     """Save this ML instance to the given path, a shortcut of 'write().save(path)'."""
--> 355     self.write().save(path)

File /databricks/spark/python/pyspark/ml/util.py:249, in MLWriter.save(self, path)
    247 if self.shouldOverwrite:
    248     self._handleOverwrite(path)
--> 249 self.saveImpl(path)

File /databricks/spark/python/pyspark/ml/util.py:519, in DefaultParamsWriter.saveImpl(self, path)
    518 def saveImpl(self, path: str) -> None:
--> 519     DefaultParamsWriter.saveMetadata(self.instance, path, self.sc)

File /databricks/spark/python/pyspark/ml/util.py:559, in DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata, paramMap)
    555 metadataPath = os.path.join(path, "metadata")
    556 metadataJson = DefaultParamsWriter._get_metadata_to_save(
    557     instance, sc, extraMetadata, paramMap
    558 )
--> 559 sc.parallelize([metadataJson], 1).saveAsTextFile(metadataPath)

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/rdd.py:3457, in RDD.saveAsTextFile(self, path, compressionCodecClass)
   3455     self.ctx._jvm.PythonRDD.saveAsTextFileImpl(keyed._jrdd, path, compressionCodecClass)
   3456 else:
-> 3457     self.ctx._jvm.PythonRDD.saveAsTextFileImpl(keyed._jrdd, path)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1349 command = proto.CALL_COMMAND_NAME +\
   1350     self.command_header +\
   1351     args_command +\
   1352     proto.END_COMMAND_PART
   1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
   1356     answer, self.gateway_client, self.target_id, self.name)
   1358 for temp_arg in temp_args:
   1359     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
    186 def deco(*a: Any, **kw: Any) -> Any:
    187     try:
--> 188         return f(*a, **kw)
    189     except Py4JJavaError as e:
    190         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsTextFileImpl.
: Failure to initialize configuration for storage account mystacc.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:52)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:682)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:2061)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:266)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:230)
	at com.databricks.common.filesystem.LokiABFS.initialize(LokiABFS.scala:36)
	at com.databricks.common.filesystem.LokiFileSystem$.$anonfun$getLokiFS$1(LokiFileSystem.scala:148)
	at com.databricks.common.filesystem.Cache.getOrCompute(Cache.scala:38)
	at com.databricks.common.filesystem.LokiFileSystem$.getLokiFS(LokiFileSystem.scala:145)
	at com.databricks.common.filesystem.LokiFileSystem.$anonfun$initialize$1(LokiFileSystem.scala:191)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.common.filesystem.LokiFileSystem.initialize(LokiFileSystem.scala:183)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:537)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at com.databricks.unity.SAM.createDelegate(SAM.scala:183)
	at com.databricks.unity.SAM.createDelegate$(SAM.scala:175)
	at com.databricks.unity.ClusterDefaultSAM$.createDelegate(SAM.scala:193)
	at com.databricks.sql.acl.fs.CredentialScopeFileSystem.createDelegate(CredentialScopeFileSystem.scala:83)
	at com.databricks.sql.acl.fs.CredentialScopeFileSystem.$anonfun$setDelegates$1(CredentialScopeFileSystem.scala:136)
	at com.databricks.sql.acl.fs.Lazy.apply(DelegatingFileSystem.scala:310)
	at com.databricks.sql.acl.fs.CredentialScopeFileSystem.getWorkingDirectory(CredentialScopeFileSystem.scala:259)
	at org.apache.spark.internal.io.SparkHadoopWriterUtils$.createPathFromString(SparkHadoopWriterUtils.scala:90)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1680)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1680)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1666)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:451)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1666)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:573)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:572)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at org.apache.spark.api.python.PythonRDD$._saveAsTextFile(PythonRDD.scala:931)
	at org.apache.spark.api.python.PythonRDD$.saveAsTextFileImpl(PythonRDD.scala:899)
	at org.apache.spark.api.python.PythonRDD.saveAsTextFileImpl(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
	at java.lang.Thread.run(Thread.java:750)
Caused by: Invalid configuration value detected for fs.azure.account.key
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.diagnostics.ConfigurationBasicValidator.validate(ConfigurationBasicValidator.java:49)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator.validate(Base64StringConfigurationBasicValidator.java:40)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.validateStorageAccountKey(SimpleKeyProvider.java:71)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:49)
	... 76 more

 

But the BinaryEvaluator is serialized without any problems:

 

BinaryEvaluator().save("abfss://mycontainer@mystacc.dfs.coore.windows.net/mymodel")

 

The cluster has the configuration "Enable credential passthrough for user-level data access" set to True. The Storage Account container authentication method is Microsoft Entra User account.

How should the DummyEvaluator be implemented in order to be serialized like the BinaryEvaluator? Are we missing some trait that must be implemented too?

Thanks in advance for your help.

Regads,

1 REPLY 1

Hi, @Retired_mod 

Thanks for your reply. I cannot understand why the serialization of the BinaryEvaluator works without this configuration set, but the DummyEvaluator does not.

Both instances are created under the same SparkSession (in the same notebook). If the configuration is missing, neither of the instances can be serialized.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group