cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

MLflow Nested run with applyInPandas does not execute

shubham_lekhwar
New Contributor

I am trying to train an forecasting model along with Hyperparameters tuning with Hyperopt.

I have multiple time series for "KEY" each of which I want to train a separate model. To do this I am using Spark's applyInPandas to tune and train model for each series in parallel.

The applyInPandas is defined within the MLflow's Parent run. The child run's are nested in the train function which is being called by applyInpandas with repartitioning over "KEY"

However, neither the execution progress even after running for more than an hour nor it fails with any exception. 

The code run's fine if I exclude the MLflow tracking.

 
with mlflow.start_run(run_name='Parent_RUN') as parent_run:
  forecast_df = (
    df_merged_sdf
    .repartition(n_tasks, "Product", "SKU")
    .groupBy("Product", "SKU")
    .applyInPandas(build_tune_and_score_model, schema=tuning_schema)
    ).toPandas()
 
 Below is the training function being called in applyInPandas:
 
def build_tune_and_score_model(df: pd.DataFrame):
 
    df.sort_values("Date", inplace=True)
    df = df.set_index("Date").asfreq(freq="W-MON")

    # Since we'll group the large Spark DataFrame by (Product, SKU)
    product = df["Product"].iloc[0]
    sku = df["SKU"].iloc[0]

    forecast_series = None
    y_forecast = None
    mlflow_run_id =None

    X_train = df[df['dataset_type'] == "train"].set_index("Date").asfreq(freq="W-MON")
    X_valid = df[df['dataset_type'] == "validation"].set_index("Date").asfreq(freq="W-MON")
    X_test = df[df['dataset_type'] == "test"].set_index("Date").asfreq(freq="W-MON")

    y_train = X_train['Sales']
    y_valid = X_valid['Sales']
    drop_col =['Product', 'SKU','Sales', 'dataset_type']

    X_train = X_train.drop(drop_col, axis=1)
    X_valid = X_valid.drop(drop_col, axis=1)
 
    with mlflow.start_run(run_name =f'{product}_{sku}_xgboost', nested=True) as run:
        mlflow_run_id = run.info.run_id
   
        # Log parameters for the Product-SKU combination within the child run
        mlflow.log_param("Product", product)
        mlflow.log_param("SKU", sku)

        def evaluate_model(h_params😞
            # Training
            xgb_model = xgb.XGBRegressor(booster='gbtree',
                                         objective='reg:squarederror',
                                         verbose=False,
                                         **h_params)
            xgb_model.fit(X_train,
                          y_train,
                          eval_set= [(X_train, y_train), (X_valid, y_valid)],
                          eval_metric="mae",
                          early_stopping_rounds=100)
     
             # Validation
            y_pred = xgb_model.predict(X_valid)
            mae = mean_absolute_error(y_valid, y_pred)
            # mlflow.log_metric("mae", mae)
            return {'status': hyperopt.STATUS_OK, 'loss': mae, 'model': xgb_model}
 

 
        search_space = {'max_depth': hp.choice('max_depth', [5, 6, 7]),
                        'gamma': hp.uniform ('gamma', 0,1),
                        'reg_alpha' : hp.uniform('reg_alpha', 0,50),
                        'reg_lambda' : hp.uniform('reg_lambda', 10,100),
                        'colsample_bytree' : hp.uniform('colsample_bytree', 0,1),
                        'min_child_weight' : hp.uniform('min_child_weight', 0, 5),
                        'n_estimators': 1000,
                        'learning_rate': hp.uniform('learning_rate', 0, .15)}
   
        # trials = SparkTrials()
        best_hparams = fmin(evaluate_model, search_space, algo=tpe.suggest)

        mlflow.log_params(best_params)

        # Training
        xgb_model = xgb.XGBRegressor(booster='gbtree',
                                         objective='reg:squarederror',
                                         verbose=False,
                                         **best_hparams)
   
        xgb_model.fit(X_train,
                      y_train,
                      eval_set=[(X_valid, y_valid)],
                      eval_metric="mae",
                      early_stopping_rounds=100)

        # Prediction on Training data
        y_train_pred = xgb_model.predict(X_test)
        mae_hist = mean_absolute_error(y_hist, y_train_pred)

        mlflow.log_metric("MAE on Training data", mae_hist)
        mlflow.log_figure(xgb.plot_importance(xgb_model))

        y_forecast = xgb_model.predict(X_test)

    forecast_series = X_test[["Product" , "SKU"]].assign(Time_FiscalWeek = final_df.index.values).assign(Sales_Fitted = y_forecast)
   
    forecast_series['mlflow_run_id'] = mlflow_run_id
    forecast_series = forecast_series[["Product" , "SKU", "Date", "Sales_Fitted",  "mlflow_run_id"]]
    return forecast_series
0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now