โ07-12-2023 09:29 PM
I have been trying to deploy spark ML Models from the experiement page via UI, the deployment gets aborted after a long run, any particular reason for why this might be happening? I have also taken care of dependencies still it is failing.
Dependency code block:
conda_env={
"dependencies":
[
"python=3.10.9"
{
"pip":["xgboost","pyspark==3.4.0","pip<=21.2.4"],
},
],
}
โ07-13-2023 12:25 AM
@Kumaran Thanks for the reply kumaram ๐
The deployment was finally successful for Random Forest algorithm, failing for sparkxgbregressor.
Sharing code snippet:
from xgboost.spark import SparkXGBRegressor
vec_assembler = VectorAssembler(inputCols=train_df.columns[1:], outputCol="features")
#rf = RandomForestRegressor(labelCol="price", maxBins=260, seed=42)
xgbr = SparkXGBRegressor(num_workers=1, label_col="price", missing=0.0)
pipeline = Pipeline(stages=[vec_assembler, xgbr])
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")
regression_evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
def objective_function(params):
# set the hyperparameters that we want to tune
max_depth = params["max_depth"]#rf,xgb
#num_trees = params["num_trees"]#rf
n_estimators = params["n_estimators"]#xgb
with mlflow.start_run():
#estimator = pipeline.copy({rf.maxDepth: max_depth, rf.numTrees: num_trees})#rf
estimator = pipeline.copy({xgbr.max_depth: max_depth, xgbr.n_estimators: n_estimators})#xgbr
model = estimator.fit(train_df)
preds = model.transform(test_df)
rmse = regression_evaluator.evaluate(preds)
#r2 = regression_evaluator2.evaluate(preds)
mlflow.log_metric("rmse", rmse)
# mlflow.spark.log_model(model, "model",conda_env=mlflow.spark.get_default_conda_env())
mlflow.spark.log_model(model, "model",conda_env=conda_env)
return rmse
from hyperopt import hp
import numpy as np
search_space = {
"max_depth" : hp.choice('max_depth', np.arange(5, 15, dtype=int)),
"n_estimators": hp.choice('n_estimators', np.arange(70, 80, dtype=int))
}
from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
# mlflow.pyspark.ml.autolog(log_models=True)
mlflow.xgboost.autolog(log_models=True)
#mlflow.fastai.autolog(log_models=False)
num_evals = 1
trials = Trials()
best_hyperparam = fmin(fn=objective_function,
space=search_space,
algo=tpe.suggest,
max_evals=num_evals,
trials=trials,
rstate=np.random.default_rng(42))
# Retrain model on train & validation dataset and evaluate on test dataset
with mlflow.start_run():
best_max_depth = best_hyperparam["max_depth"]#rf,xgb
best_n_estimators = best_hyperparam["n_estimators"]#xgb
estimator = pipeline.copy({xgbr.max_depth: best_max_depth, xgbr.n_estimators: best_n_estimators})#xgb
pipeline_model = estimator.fit(train_df.limit(188123))
pred_df = pipeline_model.transform(test_df)
rmse = regression_evaluator.evaluate(pred_df)
# Log param and metrics for the final model
mlflow.log_param("maxDepth", best_max_depth)
mlflow.log_param("n_estimators", best_n_estimators)
mlflow.log_metric("rmse", rmse)
โ07-12-2023 11:51 PM - edited โ07-12-2023 11:54 PM
Hello @raghagra ,
We appreciate your question posted in the Databricks community.
Without having a look at the code, it's difficult to determine the exact cause of the issue for the sparkxgbregressor. Could you kindly provide us with the code snippet that is causing this problem?
However, I recommend trying the Random Forest model by setting maxDepth to its default value, you may be able to mitigate the issue. According to the documentation, the recommended value for maxDepth is 5.
Here's an example code snippet:
rfModel = RandomForestRegressor(featuresCol='features', labelCol=target, maxDepth=5, numTrees=50)
Please give it a try and let us know if it helps resolve the issue.
โ07-13-2023 12:25 AM
@Kumaran Thanks for the reply kumaram ๐
The deployment was finally successful for Random Forest algorithm, failing for sparkxgbregressor.
Sharing code snippet:
from xgboost.spark import SparkXGBRegressor
vec_assembler = VectorAssembler(inputCols=train_df.columns[1:], outputCol="features")
#rf = RandomForestRegressor(labelCol="price", maxBins=260, seed=42)
xgbr = SparkXGBRegressor(num_workers=1, label_col="price", missing=0.0)
pipeline = Pipeline(stages=[vec_assembler, xgbr])
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")
regression_evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
def objective_function(params):
# set the hyperparameters that we want to tune
max_depth = params["max_depth"]#rf,xgb
#num_trees = params["num_trees"]#rf
n_estimators = params["n_estimators"]#xgb
with mlflow.start_run():
#estimator = pipeline.copy({rf.maxDepth: max_depth, rf.numTrees: num_trees})#rf
estimator = pipeline.copy({xgbr.max_depth: max_depth, xgbr.n_estimators: n_estimators})#xgbr
model = estimator.fit(train_df)
preds = model.transform(test_df)
rmse = regression_evaluator.evaluate(preds)
#r2 = regression_evaluator2.evaluate(preds)
mlflow.log_metric("rmse", rmse)
# mlflow.spark.log_model(model, "model",conda_env=mlflow.spark.get_default_conda_env())
mlflow.spark.log_model(model, "model",conda_env=conda_env)
return rmse
from hyperopt import hp
import numpy as np
search_space = {
"max_depth" : hp.choice('max_depth', np.arange(5, 15, dtype=int)),
"n_estimators": hp.choice('n_estimators', np.arange(70, 80, dtype=int))
}
from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
# mlflow.pyspark.ml.autolog(log_models=True)
mlflow.xgboost.autolog(log_models=True)
#mlflow.fastai.autolog(log_models=False)
num_evals = 1
trials = Trials()
best_hyperparam = fmin(fn=objective_function,
space=search_space,
algo=tpe.suggest,
max_evals=num_evals,
trials=trials,
rstate=np.random.default_rng(42))
# Retrain model on train & validation dataset and evaluate on test dataset
with mlflow.start_run():
best_max_depth = best_hyperparam["max_depth"]#rf,xgb
best_n_estimators = best_hyperparam["n_estimators"]#xgb
estimator = pipeline.copy({xgbr.max_depth: best_max_depth, xgbr.n_estimators: best_n_estimators})#xgb
pipeline_model = estimator.fit(train_df.limit(188123))
pred_df = pipeline_model.transform(test_df)
rmse = regression_evaluator.evaluate(pred_df)
# Log param and metrics for the final model
mlflow.log_param("maxDepth", best_max_depth)
mlflow.log_param("n_estimators", best_n_estimators)
mlflow.log_metric("rmse", rmse)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group