@Kumaran Thanks for the reply kumaram 🙂
The deployment was finally successful for Random Forest algorithm, failing for sparkxgbregressor.
Sharing code snippet:
from xgboost.spark import SparkXGBRegressor
vec_assembler = VectorAssembler(inputCols=train_df.columns[1:], outputCol="features")
#rf = RandomForestRegressor(labelCol="price", maxBins=260, seed=42)
xgbr = SparkXGBRegressor(num_workers=1, label_col="price", missing=0.0)
pipeline = Pipeline(stages=[vec_assembler, xgbr])
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")
regression_evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
def objective_function(params):
# set the hyperparameters that we want to tune
max_depth = params["max_depth"]#rf,xgb
#num_trees = params["num_trees"]#rf
n_estimators = params["n_estimators"]#xgb
with mlflow.start_run():
#estimator = pipeline.copy({rf.maxDepth: max_depth, rf.numTrees: num_trees})#rf
estimator = pipeline.copy({xgbr.max_depth: max_depth, xgbr.n_estimators: n_estimators})#xgbr
model = estimator.fit(train_df)
preds = model.transform(test_df)
rmse = regression_evaluator.evaluate(preds)
#r2 = regression_evaluator2.evaluate(preds)
mlflow.log_metric("rmse", rmse)
# mlflow.spark.log_model(model, "model",conda_env=mlflow.spark.get_default_conda_env())
mlflow.spark.log_model(model, "model",conda_env=conda_env)
return rmse
from hyperopt import hp
import numpy as np
search_space = {
"max_depth" : hp.choice('max_depth', np.arange(5, 15, dtype=int)),
"n_estimators": hp.choice('n_estimators', np.arange(70, 80, dtype=int))
}
from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
# mlflow.pyspark.ml.autolog(log_models=True)
mlflow.xgboost.autolog(log_models=True)
#mlflow.fastai.autolog(log_models=False)
num_evals = 1
trials = Trials()
best_hyperparam = fmin(fn=objective_function,
space=search_space,
algo=tpe.suggest,
max_evals=num_evals,
trials=trials,
rstate=np.random.default_rng(42))
# Retrain model on train & validation dataset and evaluate on test dataset
with mlflow.start_run():
best_max_depth = best_hyperparam["max_depth"]#rf,xgb
best_n_estimators = best_hyperparam["n_estimators"]#xgb
estimator = pipeline.copy({xgbr.max_depth: best_max_depth, xgbr.n_estimators: best_n_estimators})#xgb
pipeline_model = estimator.fit(train_df.limit(188123))
pred_df = pipeline_model.transform(test_df)
rmse = regression_evaluator.evaluate(pred_df)
# Log param and metrics for the final model
mlflow.log_param("maxDepth", best_max_depth)
mlflow.log_param("n_estimators", best_n_estimators)
mlflow.log_metric("rmse", rmse)