cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

Running Keras model training with HorovodRunner works until the training function is exited ("The MPI_Query_thread() function was called after MPI_FINALIZE was invoked.")

DataBRObin
New Contributor III

I am running training of a Keras/Tensorflow deep learning model on a cluster of (for now) 2 workers and 1 driver (T4 GPU, 28GB, 4 core) using the Databricks provided HorovodRunner. It all seems to go well and the performance scales quite nicely over multiple workers (not perfectly linear but close enough) and the addition of Elastic Horovod makes it possible to run the training on spot VMs lowering the price. This also seems to work.

The problem arises when the training process ends (in my case when the error hasn't improved for patience-number of epochs) and the model is logged to Mlflow and others (which also works fine). Somehow, when the function wants to return to the HorovodRunner part (and close down all processes), something goes wrong. I get the following (partial) stack trace:

[1,0]<stdout>:Model has finished training and stats are printed. Model will now be saved
[1,1]<stdout>:0530-132228-e04hv44m-10-139-64-4:1005:1020 [0] NCCL INFO comm 0x7fa0d8321f20 rank 1 nranks 2 cudaDev 0 busId 100000 - Destroy COMPLETE
[1,0]<stdout>:0530-132228-e04hv44m-10-139-64-7:984:995 [0] NCCL INFO comm 0x7f33d45495b0 rank 0 nranks 2 cudaDev 0 busId 100000 - Destroy COMPLETE
[1,1]<stderr>:*** The MPI_Query_thread() function was called after MPI_FINALIZE was invoked.
[1,1]<stderr>:*** This is disallowed by the MPI standard.
[1,1]<stderr>:*** Your MPI job will now abort.
[1,1]<stderr>:[0530-132228-e04hv44m-10-139-64-4:01005] Local abort after MPI_FINALIZE started completed successfully, but am not able to aggregate error messages, and not able to guarantee that all other processes were killed!
--------------------------------------------------------------------------
Primary job  terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
[1,0]<stderr>:2023-05-30 13:36:43.867857: W tensorflow/core/kernels/data/generator_dataset_op.cc:108] Error occurred when finalizing GeneratorDataset iterator: FAILED_PRECONDITION: Python interpreter state is not initialized. The process may be terminated.
[1,0]<stderr>:	 [[{{node PyFunc}}]]
--------------------------------------------------------------------------
mpirun detected that one or more processes exited with non-zero status, thus causing
the job to be terminated. The first process to do so was:
 
  Process name: [[57229,1],1]
  Exit code:    1
--------------------------------------------------------------------------

The printout of "Model has finished training..." shows that the training is not the problem, and the model is saved in mlflow correctly.

The following code is a very small sample of how I run everything. Unfortunately, I am not able to provide more here due to company policy. It should still show all the relevant parts for this problem:

def training(args, mlflow_runid, use_horovod=False, timeline=None):
    import os
 
    import horovod.tensorflow.keras as hvd
    import mlflow
    import numpy as np
    import tensorflow as tf
    import tensorflow.math as M
    import tensorflow_addons as tfa
    from petastorm import make_batch_reader
    from petastorm.tf_utils import make_petastorm_dataset
    from tensorflow import summary
    from tensorflow.keras import Input, Model, callbacks, layers, losses
 
        hvd.init()
 
        gpus = tf.config.list_physical_devices("GPU")
 
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        if gpus:
            tf.config.set_visible_devices(gpus[hvd.local_rank()], "GPU")
   
    model = #keras model
  
    optimizer = hvd.DistributedOptimizer(
                tf.keras.optimizers.legacy.Adam(),
                backward_passes_per_step=4,
                average_aggregated_gradients=True,
            )
  
    model.compile(
        optimizer=optimizer,
        loss=losses.MeanSquaredError(),
        metrics=[euclidean_batch_error],
    )
 
    with make_batch_reader(
        dataset_url_or_urls=train_dataset_reader_arguments_list[0],
        cache_type="local-disk",
    ) as train_reader, make_batch_reader(
        dataset_url_or_urls=val_dataset_reader_arguments_list[0],
        cache_type="local-disk",
    ) as val_reader:
        print("Dataset readers have been defined")
 
        # Here I prepare the train and validation set data (not important for the problem
        train_dataset = prepare_dataset(dataset_reader=train_reader)
 
        training_steps_per_epoch = args.train_df_size // (args.batch_size)
 
        val_dataset = prepare_dataset(dataset_reader=val_reader)
 
        validation_steps_per_epoch = args.val_df_size // (args.test_batch_size)
 
        print("Datasets have been parsed and batched")
 
        # https://sourcegraph.com/github.com/tensorflow/tensorflow@v2.4.1/-/blob/tensorflow/python/keras/optim...
        model.fit(train_dataset, steps_per_epoch=1, epochs=1, callbacks=None)
 
        state = hvd.elastic.KerasState(model, batch=100, epoch=0)
 
        def on_state_reset():
            """Callback invoked when the state is reset"""
            # state.model.optimizer.lr.assign(lr * hvd.size())
            tf.keras.backend.set_value(
                state.model.optimizer.lr, lr * hvd.size()
            )
            state.model.fit(
                train_dataset, steps_per_epoch=1, epochs=1, callbacks=None
            )
 
        state.register_reset_callbacks([on_state_reset])
 
        horovod_callbacks = [
            hvd.elastic.CommitStateCallback(state, batches_per_commit=100),
            hvd.elastic.UpdateBatchStateCallback(state),
            hvd.elastic.UpdateEpochStateCallback(state),
        ]
 
        all_callbacks = (
            [hvd.callbacks.MetricAverageCallback()]
            + horovod_callbacks
            + [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
            + callbacks_list
        )
 
        @hvd.elastic.run
        def train(state):
            hist = state.model.fit(
                train_dataset,
                steps_per_epoch=training_steps_per_epoch,
                epochs=args.n_epochs - state.epoch,
                validation_data=val_dataset,
                validation_steps=validation_steps_per_epoch,
                callbacks=all_callbacks,
                verbose=verbosity,
            )
 
            return state.model, hist
 
        model, hist = train(state)
  
        print(
            hist.history["val_loss"][-1],
            hist.history["val_euclidean_batch_error"][-1],
        )
 
    print(
        "Model has finished training and stats are printed. Model will now be saved"
    )
 
    # Rank of 0 is the master node so we want that one to save the model
     if hvd.rank() == 0:
         signature = get_signature_mlflow(args, landmarks_shape)
 
         mlflow.tensorflow.log_model(
             model,
             args.experiment_name,
             signature=signature,
             conda_env=mlflow.spark.get_default_conda_env(),
         )
    
    hvd.shutdown()
 
    return (
        hist.history["val_loss"][-1],
        hist.history["val_euclidean_batch_error"][-1],
    )

This is ran using the following code:

hr = HorovodRunner(np=new_args.n_gpus, driver_log_verbosity="all")
val_loss, val_euclideandistance = hr.run(
    training,
    args=new_args,
    mlflow_runid=active_run_uuid, 
    use_horovod=args.use_horovod,
)

It would be great if someone can help out to get around this error. Feel free to ask more questions to narrow down the problem further.

I've already tried not using the hvd.shutdown() function, I've also tried not saving the model in that place. To me it really seems like something internal in Horovod is the issue causing the MPI_Query_thread() function to be called after MPI_FINALIZE has been invoked by Horovod. Looking forward to hearing from you!

2 REPLIES 2

sean_owen
Databricks Employee
Databricks Employee

I personally suspect it's your callbacks. Can you remove all those state callbacks and see if that is it?

DataBRObin
New Contributor III

Hi Sean, thank you for coming back to me and my apologies for my delayed response. Unfortunately, those state callbacks are necessary for Elastic Horovod to work (https://horovod.readthedocs.io/en/stable/elastic_include.html) which I want to use as it allows me to use spot VMs that greatly reduce cost.

In case I am using it wrong or if there is another thing I should do to make elastic Horovod and horovodrunner work together, let me know and I'll give it a try!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group