cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

MLflow Nested run with applyInPandas does not execute

shubham_lekhwar
New Contributor

I am trying to train an forecasting model along with Hyperparameters tuning with Hyperopt.

I have multiple time series for "KEY" each of which I want to train a separate model. To do this I am using Spark's applyInPandas to tune and train model for each series in parallel.

The applyInPandas is defined within the MLflow's Parent run. The child run's are nested in the train function which is being called by applyInpandas with repartitioning over "KEY"

However, neither the execution progress even after running for more than an hour nor it fails with any exception. 

The code run's fine if I exclude the MLflow tracking.

 
with mlflow.start_run(run_name='Parent_RUN') as parent_run:
  forecast_df = (
    df_merged_sdf
    .repartition(n_tasks, "Product", "SKU")
    .groupBy("Product", "SKU")
    .applyInPandas(build_tune_and_score_model, schema=tuning_schema)
    ).toPandas()
 
 Below is the training function being called in applyInPandas:
 
def build_tune_and_score_model(df: pd.DataFrame):
 
    df.sort_values("Date", inplace=True)
    df = df.set_index("Date").asfreq(freq="W-MON")

    # Since we'll group the large Spark DataFrame by (Product, SKU)
    product = df["Product"].iloc[0]
    sku = df["SKU"].iloc[0]

    forecast_series = None
    y_forecast = None
    mlflow_run_id =None

    X_train = df[df['dataset_type'] == "train"].set_index("Date").asfreq(freq="W-MON")
    X_valid = df[df['dataset_type'] == "validation"].set_index("Date").asfreq(freq="W-MON")
    X_test = df[df['dataset_type'] == "test"].set_index("Date").asfreq(freq="W-MON")

    y_train = X_train['Sales']
    y_valid = X_valid['Sales']
    drop_col =['Product', 'SKU','Sales', 'dataset_type']

    X_train = X_train.drop(drop_col, axis=1)
    X_valid = X_valid.drop(drop_col, axis=1)
 
    with mlflow.start_run(run_name =f'{product}_{sku}_xgboost', nested=True) as run:
        mlflow_run_id = run.info.run_id
   
        # Log parameters for the Product-SKU combination within the child run
        mlflow.log_param("Product", product)
        mlflow.log_param("SKU", sku)

        def evaluate_model(h_params😞
            # Training
            xgb_model = xgb.XGBRegressor(booster='gbtree',
                                         objective='reg:squarederror',
                                         verbose=False,
                                         **h_params)
            xgb_model.fit(X_train,
                          y_train,
                          eval_set= [(X_train, y_train), (X_valid, y_valid)],
                          eval_metric="mae",
                          early_stopping_rounds=100)
     
             # Validation
            y_pred = xgb_model.predict(X_valid)
            mae = mean_absolute_error(y_valid, y_pred)
            # mlflow.log_metric("mae", mae)
            return {'status': hyperopt.STATUS_OK, 'loss': mae, 'model': xgb_model}
 

 
        search_space = {'max_depth': hp.choice('max_depth', [5, 6, 7]),
                        'gamma': hp.uniform ('gamma', 0,1),
                        'reg_alpha' : hp.uniform('reg_alpha', 0,50),
                        'reg_lambda' : hp.uniform('reg_lambda', 10,100),
                        'colsample_bytree' : hp.uniform('colsample_bytree', 0,1),
                        'min_child_weight' : hp.uniform('min_child_weight', 0, 5),
                        'n_estimators': 1000,
                        'learning_rate': hp.uniform('learning_rate', 0, .15)}
   
        # trials = SparkTrials()
        best_hparams = fmin(evaluate_model, search_space, algo=tpe.suggest)

        mlflow.log_params(best_params)

        # Training
        xgb_model = xgb.XGBRegressor(booster='gbtree',
                                         objective='reg:squarederror',
                                         verbose=False,
                                         **best_hparams)
   
        xgb_model.fit(X_train,
                      y_train,
                      eval_set=[(X_valid, y_valid)],
                      eval_metric="mae",
                      early_stopping_rounds=100)

        # Prediction on Training data
        y_train_pred = xgb_model.predict(X_test)
        mae_hist = mean_absolute_error(y_hist, y_train_pred)

        mlflow.log_metric("MAE on Training data", mae_hist)
        mlflow.log_figure(xgb.plot_importance(xgb_model))

        y_forecast = xgb_model.predict(X_test)

    forecast_series = X_test[["Product" , "SKU"]].assign(Time_FiscalWeek = final_df.index.values).assign(Sales_Fitted = y_forecast)
   
    forecast_series['mlflow_run_id'] = mlflow_run_id
    forecast_series = forecast_series[["Product" , "SKU", "Date", "Sales_Fitted",  "mlflow_run_id"]]
    return forecast_series
1 REPLY 1

stbjelcevic
Databricks Employee
Databricks Employee

Hi @shubham_lekhwar ,

This is a common context-passing issue when using Spark with MLflow.

The problem is that the nested=True flag in mlflow.start_run relies on an active run being present in the current process context. Your Parent_RUN is active on the driver node, but the build_tune_and_score_model function executes on worker nodes, which are separate processes and have no knowledge of the driver's active run. This causes the MLflow client on the worker to hang, waiting for a parent context that doesn't exist.

The solution is to manually pass the parent run's ID to the worker function and set the parent-child relationship using a tag.

You need to make two changes: one on the driver and one in your worker function.

On the Driver:

Get the parent_run_id before calling applyInPandas and use functools.partial to "bake" this ID into the function that Spark will distribute.

In the Worker Function (build_tune_and_score_model):

Modify the function signature to accept the new parent_run_id argument. Then, instead of nested=True, start a regular run and manually set the parent ID using mlflow.set_tag.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now