cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Py4JError: An error occurred while calling o992.resourceProfileManager

rahuja
New Contributor III

Hello 

I am trying to run the SparkXGBoostRegressor and I am getting the following error:

Spoiler
Py4JError: An error occurred while calling o992.resourceProfileManager. Trace: py4j.security.Py4JSecurityException: Method public org.apache.spark.resource.ResourceProfileManager org.apache.spark.SparkContext.resourceProfileManager() is not whitelisted on class class org.apache.spark.SparkContext at py4j.security.WhitelistingPy4JSecurityManager.checkCall(WhitelistingPy4JSecurityManager.java:473) at py4j.Gateway.invoke(Gateway.java:305) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750)

Here is my custom model class and the code I am running:

class SparkClassificationModelManual(MLflowModelSignatureMixin, SparkSerializationMixin)
    def __init__(self, inputCols, outputCol)
        super().__init__()
        self.inputCols = inputCols
        self.outputCol = outputCol
        self.featuresCol = "features"
        self.scaledFeaturesCol = "scaledFeatures"
        self.path = None  # Compulsory
        #self._model = LogisticRegression(featuresCol="scaledFeatures", labelCol=self.outputCol)
        self._model = SparkXGBRegressor(features_col="scaledFeatures", label_col=self.outputCol)   
        self._scaler = StandardScaler(inputCol=self.featuresCol, outputCol=self.scaledFeaturesCol, withStd=True, withMean=False)
        self.assembler = VectorAssembler(inputCols=self.inputCols, outputCol=self.featuresCol)
       
    def fit(self, df: pyspark.sql.DataFrame) -> None:
        # Combine feature columns into a single vector column
        assembled_df = self.assembler.transform(df)
        # Scale the features
        self._scaler = self._scaler.fit(assembled_df)
        scaled_df = self._scaler.transform(assembled_df)
        # Fit the logistic regression model
        self._model= self._model.fit(scaled_df)
       
    def save(self, path)
        self.path = str(Path(path).parent)
        super().save(path)
       
    def load(self, path)
        self.path = str(Path(path).parent)
        return super().load(path)
   
    def predict(self, test_df)
        # Assuming the model has been fitted and the same transformations are applied to test data
        assembled_test_df = self.assembler.transform(test_df)
        scaled_test_df = self._scaler.transform(assembled_test_df)
        predictions = self._model.predict(scaled_test_df)
        return predictions
 
if __name__== '__main__':
    spark = SparkSession.builder.appName("ExampleApp").getOrCreate()
    model = SparkClassificationModelManual(inputCols=["feature1", "feature2", "feature3"], outputCol="label")
    mlflow_reg = MLflowModelRegistration(
        model=model,
        model_reg_name='spark_testing_preprocess',
        model_reg_tags={"testing": "spark"}
    )
    data = spark.createDataFrame([
        (0, 0.1, 0.3, 1),
        (1, 0.2, 0.5, 0),
        (0, 0.5, 0.8, 1),
        (1, 0.3, 0.7, 0)
    ], ["feature1", "feature2", "feature3", "label"])
    mlflow.end_run()
    with mlflow_reg:
        model.fit(data)

    model = load_model(SparkClassificationModelManual, name="spark_testing_preprocess", version=2)
    res= model.predict(data)
    type(res)
    res.show()
1 REPLY 1

rahuja
New Contributor III

Hello Kaniz

I am currently using:

  • pyspark: 3.5.0 which is default in Spark ML 14.3LTS runtime
  • xgboost : 1.7.6

I have also checked the driver logs and there seems to be no problems because of some UDF(S). Anything else that can be tried?

I checked the code works perfectly fine with a single node cluster but somehow throws this error Multinode cluster. Here are the configurations of two clusters:

1. Single Node cluster

  • Data Bricks runtime version: 14.3 LTS ML (includes Apache Spark 3.5.0, Scala 2.12)
  • Node Type: Standard_D4ds_v5

The code runs perfectly fine in this one.

2. Multi Node Interactive cluster

  • Data Bricks runtime version: 14.3 LTS ML (includes Apache Spark 3.5.0, Scala 2.12)
  • Node Type: Standard_D4ds_v5
  • Min Workers: 1
  • Max Workers 3

How is this happening that two clusters with same runtime and library version but one runs perfectly fine but other throws this error?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group