I am running training of a Keras/Tensorflow deep learning model on a cluster of (for now) 2 workers and 1 driver (T4 GPU, 28GB, 4 core) using the Databricks provided HorovodRunner. It all seems to go well and the performance scales quite nicely over multiple workers (not perfectly linear but close enough) and the addition of Elastic Horovod makes it possible to run the training on spot VMs lowering the price. This also seems to work.
The problem arises when the training process ends (in my case when the error hasn't improved for patience-number of epochs) and the model is logged to Mlflow and others (which also works fine). Somehow, when the function wants to return to the HorovodRunner part (and close down all processes), something goes wrong. I get the following (partial) stack trace:
[1,0]<stdout>:Model has finished training and stats are printed. Model will now be saved
[1,1]<stdout>:0530-132228-e04hv44m-10-139-64-4:1005:1020 [0] NCCL INFO comm 0x7fa0d8321f20 rank 1 nranks 2 cudaDev 0 busId 100000 - Destroy COMPLETE
[1,0]<stdout>:0530-132228-e04hv44m-10-139-64-7:984:995 [0] NCCL INFO comm 0x7f33d45495b0 rank 0 nranks 2 cudaDev 0 busId 100000 - Destroy COMPLETE
[1,1]<stderr>:*** The MPI_Query_thread() function was called after MPI_FINALIZE was invoked.
[1,1]<stderr>:*** This is disallowed by the MPI standard.
[1,1]<stderr>:*** Your MPI job will now abort.
[1,1]<stderr>:[0530-132228-e04hv44m-10-139-64-4:01005] Local abort after MPI_FINALIZE started completed successfully, but am not able to aggregate error messages, and not able to guarantee that all other processes were killed!
--------------------------------------------------------------------------
Primary job terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
[1,0]<stderr>:2023-05-30 13:36:43.867857: W tensorflow/core/kernels/data/generator_dataset_op.cc:108] Error occurred when finalizing GeneratorDataset iterator: FAILED_PRECONDITION: Python interpreter state is not initialized. The process may be terminated.
[1,0]<stderr>: [[{{node PyFunc}}]]
--------------------------------------------------------------------------
mpirun detected that one or more processes exited with non-zero status, thus causing
the job to be terminated. The first process to do so was:
Process name: [[57229,1],1]
Exit code: 1
--------------------------------------------------------------------------
The printout of "Model has finished training..." shows that the training is not the problem, and the model is saved in mlflow correctly.
The following code is a very small sample of how I run everything. Unfortunately, I am not able to provide more here due to company policy. It should still show all the relevant parts for this problem:
def training(args, mlflow_runid, use_horovod=False, timeline=None):
import os
import horovod.tensorflow.keras as hvd
import mlflow
import numpy as np
import tensorflow as tf
import tensorflow.math as M
import tensorflow_addons as tfa
from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset
from tensorflow import summary
from tensorflow.keras import Input, Model, callbacks, layers, losses
hvd.init()
gpus = tf.config.list_physical_devices("GPU")
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.set_visible_devices(gpus[hvd.local_rank()], "GPU")
model = #keras model
optimizer = hvd.DistributedOptimizer(
tf.keras.optimizers.legacy.Adam(),
backward_passes_per_step=4,
average_aggregated_gradients=True,
)
model.compile(
optimizer=optimizer,
loss=losses.MeanSquaredError(),
metrics=[euclidean_batch_error],
)
with make_batch_reader(
dataset_url_or_urls=train_dataset_reader_arguments_list[0],
cache_type="local-disk",
) as train_reader, make_batch_reader(
dataset_url_or_urls=val_dataset_reader_arguments_list[0],
cache_type="local-disk",
) as val_reader:
print("Dataset readers have been defined")
# Here I prepare the train and validation set data (not important for the problem
train_dataset = prepare_dataset(dataset_reader=train_reader)
training_steps_per_epoch = args.train_df_size // (args.batch_size)
val_dataset = prepare_dataset(dataset_reader=val_reader)
validation_steps_per_epoch = args.val_df_size // (args.test_batch_size)
print("Datasets have been parsed and batched")
# https://sourcegraph.com/github.com/tensorflow/tensorflow@v2.4.1/-/blob/tensorflow/python/keras/optim...
model.fit(train_dataset, steps_per_epoch=1, epochs=1, callbacks=None)
state = hvd.elastic.KerasState(model, batch=100, epoch=0)
def on_state_reset():
"""Callback invoked when the state is reset"""
# state.model.optimizer.lr.assign(lr * hvd.size())
tf.keras.backend.set_value(
state.model.optimizer.lr, lr * hvd.size()
)
state.model.fit(
train_dataset, steps_per_epoch=1, epochs=1, callbacks=None
)
state.register_reset_callbacks([on_state_reset])
horovod_callbacks = [
hvd.elastic.CommitStateCallback(state, batches_per_commit=100),
hvd.elastic.UpdateBatchStateCallback(state),
hvd.elastic.UpdateEpochStateCallback(state),
]
all_callbacks = (
[hvd.callbacks.MetricAverageCallback()]
+ horovod_callbacks
+ [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
+ callbacks_list
)
@hvd.elastic.run
def train(state):
hist = state.model.fit(
train_dataset,
steps_per_epoch=training_steps_per_epoch,
epochs=args.n_epochs - state.epoch,
validation_data=val_dataset,
validation_steps=validation_steps_per_epoch,
callbacks=all_callbacks,
verbose=verbosity,
)
return state.model, hist
model, hist = train(state)
print(
hist.history["val_loss"][-1],
hist.history["val_euclidean_batch_error"][-1],
)
print(
"Model has finished training and stats are printed. Model will now be saved"
)
# Rank of 0 is the master node so we want that one to save the model
if hvd.rank() == 0:
signature = get_signature_mlflow(args, landmarks_shape)
mlflow.tensorflow.log_model(
model,
args.experiment_name,
signature=signature,
conda_env=mlflow.spark.get_default_conda_env(),
)
hvd.shutdown()
return (
hist.history["val_loss"][-1],
hist.history["val_euclidean_batch_error"][-1],
)
This is ran using the following code:
hr = HorovodRunner(np=new_args.n_gpus, driver_log_verbosity="all")
val_loss, val_euclideandistance = hr.run(
training,
args=new_args,
mlflow_runid=active_run_uuid,
use_horovod=args.use_horovod,
)
It would be great if someone can help out to get around this error. Feel free to ask more questions to narrow down the problem further.
I've already tried not using the hvd.shutdown() function, I've also tried not saving the model in that place. To me it really seems like something internal in Horovod is the issue causing the MPI_Query_thread() function to be called after MPI_FINALIZE has been invoked by Horovod. Looking forward to hearing from you!