cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

sparkxgbregressor and RandomForestRegressor not able to deploy for inferencing

raghagra
New Contributor III

I have been trying to deploy spark ML Models from the experiement page via UI, the deployment gets aborted after a long run, any particular reason for why this might be happening? I have also taken care of dependencies still it is failing.

Dependency code block:

 

conda_env={
"dependencies":
[
"python=3.10.9"
{
"pip":["xgboost","pyspark==3.4.0","pip<=21.2.4"],
},
],
}

 

1 ACCEPTED SOLUTION

Accepted Solutions

raghagra
New Contributor III

@Kumaran Thanks for the reply kumaram ๐Ÿ™‚

The deployment was finally successful for Random Forest algorithm, failing for sparkxgbregressor.

Sharing code snippet:

from xgboost.spark import SparkXGBRegressor
vec_assembler = VectorAssembler(inputCols=train_df.columns[1:], outputCol="features")
#rf = RandomForestRegressor(labelCol="price", maxBins=260, seed=42)
xgbr = SparkXGBRegressor(num_workers=1, label_col="price", missing=0.0)
pipeline = Pipeline(stages=[vec_assembler, xgbr])
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")
regression_evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")


def objective_function(params):    
    # set the hyperparameters that we want to tune
    max_depth = params["max_depth"]#rf,xgb
    #num_trees = params["num_trees"]#rf
    n_estimators = params["n_estimators"]#xgb

    with mlflow.start_run():
        #estimator = pipeline.copy({rf.maxDepth: max_depth, rf.numTrees: num_trees})#rf
        estimator = pipeline.copy({xgbr.max_depth: max_depth, xgbr.n_estimators: n_estimators})#xgbr
        model = estimator.fit(train_df)

        preds = model.transform(test_df)
        rmse = regression_evaluator.evaluate(preds)
        #r2 = regression_evaluator2.evaluate(preds)
        mlflow.log_metric("rmse", rmse)
        # mlflow.spark.log_model(model, "model",conda_env=mlflow.spark.get_default_conda_env())
        mlflow.spark.log_model(model, "model",conda_env=conda_env)

    return rmse

from hyperopt import hp
import numpy as np

search_space = {

    "max_depth" : hp.choice('max_depth', np.arange(5, 15, dtype=int)),

    "n_estimators": hp.choice('n_estimators', np.arange(70, 80, dtype=int))
}

from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
# mlflow.pyspark.ml.autolog(log_models=True)
mlflow.xgboost.autolog(log_models=True)
#mlflow.fastai.autolog(log_models=False)

num_evals = 1
trials = Trials()
best_hyperparam = fmin(fn=objective_function, 
                       space=search_space,
                       algo=tpe.suggest, 
                       max_evals=num_evals,
                       trials=trials,
                       rstate=np.random.default_rng(42))

# Retrain model on train & validation dataset and evaluate on test dataset
with mlflow.start_run():

    best_max_depth = best_hyperparam["max_depth"]#rf,xgb
    
    best_n_estimators = best_hyperparam["n_estimators"]#xgb
    estimator = pipeline.copy({xgbr.max_depth: best_max_depth, xgbr.n_estimators: best_n_estimators})#xgb


    pipeline_model = estimator.fit(train_df.limit(188123))
    pred_df = pipeline_model.transform(test_df)
    rmse = regression_evaluator.evaluate(pred_df)
    

    # Log param and metrics for the final model
    mlflow.log_param("maxDepth", best_max_depth)
    mlflow.log_param("n_estimators", best_n_estimators)
    
    mlflow.log_metric("rmse", rmse)
    

View solution in original post

2 REPLIES 2

Kumaran
Databricks Employee
Databricks Employee

Hello @raghagra ,

We appreciate your question posted in the Databricks community.

Without having a look at the code, it's difficult to determine the exact cause of the issue for the sparkxgbregressor. Could you kindly provide us with the code snippet that is causing this problem?

However, I recommend trying the Random Forest model by setting maxDepth to its default value, you may be able to mitigate the issue. According to the documentation, the recommended value for maxDepth is 5.

Here's an example code snippet:

rfModel = RandomForestRegressor(featuresCol='features', labelCol=target, maxDepth=5, numTrees=50)

Please give it a try and let us know if it helps resolve the issue.

raghagra
New Contributor III

@Kumaran Thanks for the reply kumaram ๐Ÿ™‚

The deployment was finally successful for Random Forest algorithm, failing for sparkxgbregressor.

Sharing code snippet:

from xgboost.spark import SparkXGBRegressor
vec_assembler = VectorAssembler(inputCols=train_df.columns[1:], outputCol="features")
#rf = RandomForestRegressor(labelCol="price", maxBins=260, seed=42)
xgbr = SparkXGBRegressor(num_workers=1, label_col="price", missing=0.0)
pipeline = Pipeline(stages=[vec_assembler, xgbr])
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")
regression_evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")


def objective_function(params):    
    # set the hyperparameters that we want to tune
    max_depth = params["max_depth"]#rf,xgb
    #num_trees = params["num_trees"]#rf
    n_estimators = params["n_estimators"]#xgb

    with mlflow.start_run():
        #estimator = pipeline.copy({rf.maxDepth: max_depth, rf.numTrees: num_trees})#rf
        estimator = pipeline.copy({xgbr.max_depth: max_depth, xgbr.n_estimators: n_estimators})#xgbr
        model = estimator.fit(train_df)

        preds = model.transform(test_df)
        rmse = regression_evaluator.evaluate(preds)
        #r2 = regression_evaluator2.evaluate(preds)
        mlflow.log_metric("rmse", rmse)
        # mlflow.spark.log_model(model, "model",conda_env=mlflow.spark.get_default_conda_env())
        mlflow.spark.log_model(model, "model",conda_env=conda_env)

    return rmse

from hyperopt import hp
import numpy as np

search_space = {

    "max_depth" : hp.choice('max_depth', np.arange(5, 15, dtype=int)),

    "n_estimators": hp.choice('n_estimators', np.arange(70, 80, dtype=int))
}

from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
# mlflow.pyspark.ml.autolog(log_models=True)
mlflow.xgboost.autolog(log_models=True)
#mlflow.fastai.autolog(log_models=False)

num_evals = 1
trials = Trials()
best_hyperparam = fmin(fn=objective_function, 
                       space=search_space,
                       algo=tpe.suggest, 
                       max_evals=num_evals,
                       trials=trials,
                       rstate=np.random.default_rng(42))

# Retrain model on train & validation dataset and evaluate on test dataset
with mlflow.start_run():

    best_max_depth = best_hyperparam["max_depth"]#rf,xgb
    
    best_n_estimators = best_hyperparam["n_estimators"]#xgb
    estimator = pipeline.copy({xgbr.max_depth: best_max_depth, xgbr.n_estimators: best_n_estimators})#xgb


    pipeline_model = estimator.fit(train_df.limit(188123))
    pred_df = pipeline_model.transform(test_df)
    rmse = regression_evaluator.evaluate(pred_df)
    

    # Log param and metrics for the final model
    mlflow.log_param("maxDepth", best_max_depth)
    mlflow.log_param("n_estimators", best_n_estimators)
    
    mlflow.log_metric("rmse", rmse)
    

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group