11-28-2022 11:52 AM
I am trying to save model after distributed training via the following code
import sys
from spark_tensorflow_distributor import MirroredStrategyRunner
import mlflow.keras
mlflow.keras.autolog()
mlflow.log_param("learning_rate", 0.001)
import tensorflow as tf
import time
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_breast_canc # add er, because databrick doesn't allow canc....
def train():
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
#tf.distribute.experimental.CollectiveCommunication.NCCL
model = None
with strategy.scope():
data = load_breast_canc() # add er, because databrick doesn't allow canc....
X_train, X_test, y_train, y_test = train_test_split(data.data, data.target, test_size=0.3)
N, D = X_train.shape # number of observation and variables
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
model = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(D,)),
tf.keras.layers.Dense(1, activation='sigmoid') # use sigmoid function for every epochs
])
model.compile(optimizer='adam', # use adaptive momentum
loss='binary_crossentropy',
metrics=['accuracy'])
# Train the Model
r = model.fit(X_train, y_train, validation_data=(X_test, y_test))
print("Train score:", model.evaluate(X_train, y_train)) # evaluate returns loss and accuracy
mlflow.keras.log_model(model, "mymodel")
MirroredStrategyRunner(num_slots=4, use_custom_strategy=True).run(train)
@https://github.com/tensorflow/ecosystem/blob/master/spark/spark-tensorflow-distributor/spark_tensorflow_distributor/mirrored_strategy_runner.py
I have a couple questions
11-29-2022 06:34 AM
It is very good that there are now many useful programs that make it easy to use, such as cat et software. I recommend it to everyone.
11-30-2022 03:25 AM
ModelCheckpoint callback is used in conjunction with training using model. fit() to save a model or weights (in a checkpoint file) at some interval, so the model or weights can be loaded later to continue the training from the state saved.
11-30-2022 12:31 PM
how does model checkpoint knows who is the chief node?
there should be an api for 1 resulting model from distributed training?
03-13-2024 10:40 AM
Is there any update on the answer? I am curious too.
Is there a merge operation after all the distributed training finished?
03-14-2024 06:44 AM
I guess spark_tensorflow_distributor is probably obsolete since there is no update since 2020.
Horovod (https://github.com/horovod) seems a better choice on using tensorflow in Databricks with Spark.
03-21-2024 06:50 AM
I think I finally worked this out.
Here is the extra code to save out the model only once and from the 1st node:
context = pyspark.BarrierTaskContext.get()
if context.partitionId() == 0: mlflow.keras.log_model(model, "mymodel")
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group