def build_tune_and_score_model(df: pd.DataFrame):
df.sort_values("Date", inplace=True)
df = df.set_index("Date").asfreq(freq="W-MON")
# Since we'll group the large Spark DataFrame by (Product, SKU)
product = df["Product"].iloc[0]
sku = df["SKU"].iloc[0]
forecast_series = None
y_forecast = None
mlflow_run_id =None
X_train = df[df['dataset_type'] == "train"].set_index("Date").asfreq(freq="W-MON")
X_valid = df[df['dataset_type'] == "validation"].set_index("Date").asfreq(freq="W-MON")
X_test = df[df['dataset_type'] == "test"].set_index("Date").asfreq(freq="W-MON")
y_train = X_train['Sales']
y_valid = X_valid['Sales']
drop_col =['Product', 'SKU','Sales', 'dataset_type']
X_train = X_train.drop(drop_col, axis=1)
X_valid = X_valid.drop(drop_col, axis=1)
with mlflow.start_run(run_name =f'{product}_{sku}_xgboost', nested=True) as run:
mlflow_run_id = run.info.run_id
# Log parameters for the Product-SKU combination within the child run
mlflow.log_param("Product", product)
mlflow.log_param("SKU", sku)
def evaluate_model(h_params😞
# Training
xgb_model = xgb.XGBRegressor(booster='gbtree',
objective='reg:squarederror',
verbose=False,
**h_params)
xgb_model.fit(X_train,
y_train,
eval_set= [(X_train, y_train), (X_valid, y_valid)],
eval_metric="mae",
early_stopping_rounds=100)
# Validation
y_pred = xgb_model.predict(X_valid)
mae = mean_absolute_error(y_valid, y_pred)
# mlflow.log_metric("mae", mae)
return {'status': hyperopt.STATUS_OK, 'loss': mae, 'model': xgb_model}
search_space = {'max_depth': hp.choice('max_depth', [5, 6, 7]),
'gamma': hp.uniform ('gamma', 0,1),
'reg_alpha' : hp.uniform('reg_alpha', 0,50),
'reg_lambda' : hp.uniform('reg_lambda', 10,100),
'colsample_bytree' : hp.uniform('colsample_bytree', 0,1),
'min_child_weight' : hp.uniform('min_child_weight', 0, 5),
'n_estimators': 1000,
'learning_rate': hp.uniform('learning_rate', 0, .15)}
# trials = SparkTrials()
best_hparams = fmin(evaluate_model, search_space, algo=tpe.suggest)
mlflow.log_params(best_params)
# Training
xgb_model = xgb.XGBRegressor(booster='gbtree',
objective='reg:squarederror',
verbose=False,
**best_hparams)
xgb_model.fit(X_train,
y_train,
eval_set=[(X_valid, y_valid)],
eval_metric="mae",
early_stopping_rounds=100)
# Prediction on Training data
y_train_pred = xgb_model.predict(X_test)
mae_hist = mean_absolute_error(y_hist, y_train_pred)
mlflow.log_metric("MAE on Training data", mae_hist)
mlflow.log_figure(xgb.plot_importance(xgb_model))
y_forecast = xgb_model.predict(X_test)
forecast_series = X_test[["Product" , "SKU"]].assign(Time_FiscalWeek = final_df.index.values).assign(Sales_Fitted = y_forecast)
forecast_series['mlflow_run_id'] = mlflow_run_id
forecast_series = forecast_series[["Product" , "SKU", "Date", "Sales_Fitted", "mlflow_run_id"]]
return forecast_series