- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-12-2023 09:29 PM
I have been trying to deploy spark ML Models from the experiement page via UI, the deployment gets aborted after a long run, any particular reason for why this might be happening? I have also taken care of dependencies still it is failing.
Dependency code block:
conda_env={
"dependencies":
[
"python=3.10.9"
{
"pip":["xgboost","pyspark==3.4.0","pip<=21.2.4"],
},
],
}
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-12-2023 11:51 PM - edited 07-12-2023 11:54 PM
Hello @raghagra ,
We appreciate your question posted in the Databricks community.
Without having a look at the code, it's difficult to determine the exact cause of the issue for the sparkxgbregressor. Could you kindly provide us with the code snippet that is causing this problem?
However, I recommend trying the Random Forest model by setting maxDepth to its default value, you may be able to mitigate the issue. According to the documentation, the recommended value for maxDepth is 5.
Here's an example code snippet:
rfModel = RandomForestRegressor(featuresCol='features', labelCol=target, maxDepth=5, numTrees=50)
Please give it a try and let us know if it helps resolve the issue.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-13-2023 12:25 AM
@Kumaran Thanks for the reply kumaram 🙂
The deployment was finally successful for Random Forest algorithm, failing for sparkxgbregressor.
Sharing code snippet:
from xgboost.spark import SparkXGBRegressor
vec_assembler = VectorAssembler(inputCols=train_df.columns[1:], outputCol="features")
#rf = RandomForestRegressor(labelCol="price", maxBins=260, seed=42)
xgbr = SparkXGBRegressor(num_workers=1, label_col="price", missing=0.0)
pipeline = Pipeline(stages=[vec_assembler, xgbr])
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")
regression_evaluator2 = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
def objective_function(params):
# set the hyperparameters that we want to tune
max_depth = params["max_depth"]#rf,xgb
#num_trees = params["num_trees"]#rf
n_estimators = params["n_estimators"]#xgb
with mlflow.start_run():
#estimator = pipeline.copy({rf.maxDepth: max_depth, rf.numTrees: num_trees})#rf
estimator = pipeline.copy({xgbr.max_depth: max_depth, xgbr.n_estimators: n_estimators})#xgbr
model = estimator.fit(train_df)
preds = model.transform(test_df)
rmse = regression_evaluator.evaluate(preds)
#r2 = regression_evaluator2.evaluate(preds)
mlflow.log_metric("rmse", rmse)
# mlflow.spark.log_model(model, "model",conda_env=mlflow.spark.get_default_conda_env())
mlflow.spark.log_model(model, "model",conda_env=conda_env)
return rmse
from hyperopt import hp
import numpy as np
search_space = {
"max_depth" : hp.choice('max_depth', np.arange(5, 15, dtype=int)),
"n_estimators": hp.choice('n_estimators', np.arange(70, 80, dtype=int))
}
from hyperopt import fmin, tpe, Trials
import numpy as np
import mlflow
import mlflow.spark
# mlflow.pyspark.ml.autolog(log_models=True)
mlflow.xgboost.autolog(log_models=True)
#mlflow.fastai.autolog(log_models=False)
num_evals = 1
trials = Trials()
best_hyperparam = fmin(fn=objective_function,
space=search_space,
algo=tpe.suggest,
max_evals=num_evals,
trials=trials,
rstate=np.random.default_rng(42))
# Retrain model on train & validation dataset and evaluate on test dataset
with mlflow.start_run():
best_max_depth = best_hyperparam["max_depth"]#rf,xgb
best_n_estimators = best_hyperparam["n_estimators"]#xgb
estimator = pipeline.copy({xgbr.max_depth: best_max_depth, xgbr.n_estimators: best_n_estimators})#xgb
pipeline_model = estimator.fit(train_df.limit(188123))
pred_df = pipeline_model.transform(test_df)
rmse = regression_evaluator.evaluate(pred_df)
# Log param and metrics for the final model
mlflow.log_param("maxDepth", best_max_depth)
mlflow.log_param("n_estimators", best_n_estimators)
mlflow.log_metric("rmse", rmse)