MLflow Nested run with applyInPandas does not execute
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
I am trying to train an forecasting model along with Hyperparameters tuning with Hyperopt.
I have multiple time series for "KEY" each of which I want to train a separate model. To do this I am using Spark's applyInPandas to tune and train model for each series in parallel.
The applyInPandas is defined within the MLflow's Parent run. The child run's are nested in the train function which is being called by applyInpandas with repartitioning over "KEY"
However, neither the execution progress even after running for more than an hour nor it fails with any exception.
The code run's fine if I exclude the MLflow tracking.
with mlflow.start_run(run_name='Parent_RUN') as parent_run:
forecast_df = (
df_merged_sdf
.repartition(n_tasks, "Product", "SKU")
.groupBy("Product", "SKU")
.applyInPandas(build_tune_and_score_model, schema=tuning_schema)
).toPandas()
Below is the training function being called in applyInPandas:
def build_tune_and_score_model(df: pd.DataFrame):
df.sort_values("Date", inplace=True)
df = df.set_index("Date").asfreq(freq="W-MON")
# Since we'll group the large Spark DataFrame by (Product, SKU)
product = df["Product"].iloc[0]
sku = df["SKU"].iloc[0]
forecast_series = None
y_forecast = None
mlflow_run_id =None
X_train = df[df['dataset_type'] == "train"].set_index("Date").asfreq(freq="W-MON")
X_valid = df[df['dataset_type'] == "validation"].set_index("Date").asfreq(freq="W-MON")
X_test = df[df['dataset_type'] == "test"].set_index("Date").asfreq(freq="W-MON")
y_train = X_train['Sales']
y_valid = X_valid['Sales']
drop_col =['Product', 'SKU','Sales', 'dataset_type']
X_train = X_train.drop(drop_col, axis=1)
X_valid = X_valid.drop(drop_col, axis=1)
with mlflow.start_run(run_name =f'{product}_{sku}_xgboost', nested=True) as run:
mlflow_run_id = run.info.run_id
# Log parameters for the Product-SKU combination within the child run
mlflow.log_param("Product", product)
mlflow.log_param("SKU", sku)
def evaluate_model(h_params😞
# Training
xgb_model = xgb.XGBRegressor(booster='gbtree',
objective='reg:squarederror',
verbose=False,
**h_params)
xgb_model.fit(X_train,
y_train,
eval_set= [(X_train, y_train), (X_valid, y_valid)],
eval_metric="mae",
early_stopping_rounds=100)
# Validation
y_pred = xgb_model.predict(X_valid)
mae = mean_absolute_error(y_valid, y_pred)
# mlflow.log_metric("mae", mae)
return {'status': hyperopt.STATUS_OK, 'loss': mae, 'model': xgb_model}
search_space = {'max_depth': hp.choice('max_depth', [5, 6, 7]),
'gamma': hp.uniform ('gamma', 0,1),
'reg_alpha' : hp.uniform('reg_alpha', 0,50),
'reg_lambda' : hp.uniform('reg_lambda', 10,100),
'colsample_bytree' : hp.uniform('colsample_bytree', 0,1),
'min_child_weight' : hp.uniform('min_child_weight', 0, 5),
'n_estimators': 1000,
'learning_rate': hp.uniform('learning_rate', 0, .15)}
# trials = SparkTrials()
best_hparams = fmin(evaluate_model, search_space, algo=tpe.suggest)
mlflow.log_params(best_params)
# Training
xgb_model = xgb.XGBRegressor(booster='gbtree',
objective='reg:squarederror',
verbose=False,
**best_hparams)
xgb_model.fit(X_train,
y_train,
eval_set=[(X_valid, y_valid)],
eval_metric="mae",
early_stopping_rounds=100)
# Prediction on Training data
y_train_pred = xgb_model.predict(X_test)
mae_hist = mean_absolute_error(y_hist, y_train_pred)
mlflow.log_metric("MAE on Training data", mae_hist)
mlflow.log_figure(xgb.plot_importance(xgb_model))
y_forecast = xgb_model.predict(X_test)
forecast_series = X_test[["Product" , "SKU"]].assign(Time_FiscalWeek = final_df.index.values).assign(Sales_Fitted = y_forecast)
forecast_series['mlflow_run_id'] = mlflow_run_id
forecast_series = forecast_series[["Product" , "SKU", "Date", "Sales_Fitted", "mlflow_run_id"]]
return forecast_series
0 REPLIES 0

