08-23-2022 10:45 AM
I am trying to distribute hyperparameter tuning using hyperopt on a tensorflow.keras model. I am using sparkTrials in my fmin:
spark_trials = SparkTrials(parallelism=4)
...
best_hyperparam = fmin(fn=CNN_HOF,
space=space,
algo=tpe.suggest,
max_evals=tuner_max_evals,
trials=spark_trials)
but I am receiving this error:
TypeError: cannot pickle '_thread.lock' object
the only way the code is working is skipping the trials passing by commenting out the line trials=spark_trials which means there would be no distributed tuning.
Any idea how can I fix this?
@Tian Tan
@Sara Dooley
09-01-2022 09:53 AM
Try the below:
def CNN_HOF(train_df_pd, valid_df_pd, test_df_pd, params): #Hyperopt objective function
train_generator = train_data_gen.flow_from_dataframe(dataframe=train_df_pd,
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=train_batch)
valid_generator = valid_data_gen.flow_from_dataframe(dataframe=valid_df_pd,
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=valid_batch,
shuffle=False,
seed=42)
test_generator = test_data_gen.flow_from_dataframe(dataframe=test_df_pd,
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=test_batch,
shuffle=False,
seed=42)
mlflow.tensorflow.autolog()
model = model_builder(params,dense_size)
model.compile(loss="categorical_crossentropy",
optimizer=Adam(),
metrics=["accuracy"])
history = model.fit(train_generator,
steps_per_epoch=train_step,
epochs=tuner_epochs,
validation_data=valid_generator,
validation_steps=valid_step,
verbose=2)
# Evaluate the model
score = model.evaluate(test_generator, steps=1, verbose=0)
obj_metric = score[0]
return float(obj_metric)
I'm assuming your dense_size, valid_batch, train_batch, test_batch, image_dir, and target are global variables. Note that train_df_pd = train_df.toPandas(), valid_df_pd = valid_df.toPandas(), and test_df_pd = test_df.toPandas() are outside of the objective function and the pandas dataframes are being brought in as arguments. Then you put the generator in the objective function in here. If this works, then the generator was the issue and we can do something to speed up this process.
I also want to note that I am returning the score as a float. I'm wondering if that score[0] is a dictionary and not a float but I casted it but you can print that type to validate.
08-26-2022 02:48 PM
This can happen when you try to serialize a keras model with an unserializable layer. What does your model look like? Also what is in that search space variable? What are you trying to optimize on?
08-26-2022 02:59 PM
This is more code and details:
space = {
"pool_1": hp.choice('pool_1',np.arange(2, 5,1, dtype=int)),
"conv_1": hp.choice('conv_1', np.arange(16, 128, 16,dtype=int)),
"conv_1b": hp.choice('conv_1b', np.arange(2, 5, 1,dtype=int)),
"pool_2": hp.choice('pool_2',np.arange(2, 5,1, dtype=int)),
"reg_2" : hp.choice('reg_2', np.arange(0.00005, 0.01, 0.00001, dtype=float)),
"conv_2": hp.choice("conv_2", np.arange(16, 128, 16, dtype=int)),
"conv_2b": hp.choice("conv_2b", np.arange(2, 5, 1, dtype=int)),
"pool_3": hp.choice('pool_3', np.arange(2, 5, 1, dtype=int)),
"reg_3" : hp.choice('reg_3', np.arange(0.00005, 0.01, 0.00001, dtype=float)),
"conv_3": hp.choice("conv_3", np.arange(16, 128, 16, dtype=int)),
"conv_3b": hp.choice("conv_3b", np.arange(2, 5, 1, dtype=int)),
"pool_4" : hp.choice('pool_4', np.arange(2, 5, 1, dtype=int)),
"reg_4" : hp.choice('reg_4', np.arange(0.00005, 0.01, 0.00001, dtype=float)),
"conv_4": hp.choice("conv_4", np.arange(16, 128, 16, dtype=int)),
"conv_4b": hp.choice("conv_4b",np.arange(2, 5, 1, dtype=int)),
"drop_4": hp.choice('drop_4', np.arange(0.00005, 0.01, 0.00001, dtype=float))
}
def model_builder(params,dense_size): #CNN builder function
model = Sequential()
model.add(Conv2D(int(params['conv_1']), (int(params['conv_1b']), int(params['conv_1b'])), activation='relu', input_shape=(150, 150, 3)))
model.add(MaxPooling2D(int(params['pool_1']), int(params['pool_1'])))
model.add(Conv2D(int(params['conv_2']), (int(params['conv_2b']), int(params['conv_2b'])), activation='relu', kernel_regularizer=L1L2(float(params['reg_2']), float(params['reg_2']))))
model.add(MaxPooling2D(int(params['pool_2']), int(params['pool_2'])))
model.add(Conv2D(int(params['conv_3']), (int(params['conv_3b']), int(params['conv_3b'])), activation='relu', kernel_regularizer=L1L2(float(params['reg_3']), float(params['reg_3']))))
model.add(MaxPooling2D(int(params['pool_3']), int(params['pool_3'])))
model.add(Conv2D(int(params['conv_4']), (int(params['conv_4b']), int(params['conv_4b'])), activation='relu', kernel_regularizer=L1L2(float(params['reg_4']), float(params['reg_4']))))
model.add(MaxPooling2D(int(params['pool_4']), int(params['pool_4'])))
model.add(Dropout(float(params['drop_4'])))
model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dense(dense_size, activation='softmax'))
return model
def CNN_HOF(params): #Hyperopt objective function
mlflow.tensorflow.autolog()
model = model_builder(params,dense_size)
model.compile(loss="categorical_crossentropy",
optimizer=Adam(),
metrics=["accuracy"])
history = model.fit(train_generator,
steps_per_epoch=train_step,
epochs=tuner_epochs,
validation_data=valid_generator,
validation_steps=valid_step,
verbose=2)
# Evaluate the model
score = model.evaluate(test_generator, steps=1, verbose=0)
obj_metric = score[0]
return {"loss": obj_metric, "status": STATUS_OK}
spark_trials = SparkTrials(parallelism=4)
...
with mlflow.start_run(run_name=model_name+"_Tuning"):
best_hyperparam = fmin(fn=CNN_HOF,
space=space,
algo=tpe.suggest,
max_evals=tuner_max_evals,
early_stop_fn=no_progress_loss(10),
trials=spark_trials)
This is the complete error message:
TypeError Traceback (most recent call last)
<command-2155252138731800> in <module>
1 if tuning:
----> 2 Hyperparameter_tuning(model_name)
<command-3238776031025884> in Hyperparameter_tuning(model_name)
2 with mlflow.start_run(run_name=model_name+"_Tuning"):
3 # mlflow.tensorflow.autolog()
----> 4 best_hyperparam = fmin(fn=CNN_HOF,
5 space=space,
6 algo=tpe.suggest,
/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
563
564 if allow_trials_fmin and hasattr(trials, "fmin"):
--> 565 return trials.fmin(
566 fn,
567 space,
/databricks/.python_edge_libs/hyperopt/instrumentation.py in instrumented(func, self, args, kwargs)
25 )
26 try:
---> 27 return_val = func(*args, **kwargs)
28 except Exception as exc:
29 error_string = "{} with message: {}".format(type(exc).__name__, str(exc))
/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
311 except BaseException as e:
312 logger.debug("fmin thread exits with an exception raised.")
--> 313 raise e
314 else:
315 logger.debug("fmin thread exits normally.")
/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
283 )
284
--> 285 res = fmin(
286 fn,
287 space,
/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
592 domain = base.Domain(fn, space, pass_expr_memo_ctrl=pass_expr_memo_ctrl)
593
--> 594 rval = FMinIter(
595 algo,
596 domain,
/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)
180 )
181 else:
--> 182 raise e
183 trials.attachments["FMinIter_Domain"] = msg
184
/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)
163 logger.warning("over-writing old domain trials attachment")
164 try:
--> 165 msg = pickler.dumps(domain)
166 except TypeError as e:
167 if "cannot pickle '_thread.RLock' object" in str(e):
/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
75
/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
561 def dump(self, obj):
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
565 if "recursion" in e.args[0]:
TypeError: cannot pickle '_thread.lock' object
08-30-2022 10:23 AM
This is a summary of the cluster:
2-8 Workers
32-128 GB Memory
8-32 Cores
1 Driver
16 GB Memory, 4 Cores
Runtime
10.5.x-gpu-ml-scala2.12
(3) what is the value set for tuner_max_evals? Right now it's been set to 36, but with every value of this, we receive the error.
08-30-2022 10:59 AM
From looking through your full error, it looks like the error is first with FMinIter before it tries to do a Pickler dump after that error which is what is throwing that specific error. I think the true error we are chasing here is the FMinIter error.
First, try to test the search space on something simpler (maybe only trying it only pool for one layer at first). Additionally, I would use hp.quniform with min >>= 1 such as this:
search_space = {
"pool_1": hp.quniform("pool_1", 2, 5, 1),
}
Another thought is that you need to load your train_generator & test_generator in the objective function which I'm assuming could look like this:
train_generator = datagen.flow_from_directory(directory="/image/path",
class_mode="binary",
classes=["normal", "abnormal"],
batch_size=batch_size,
target_size=(img_height, img_width))
08-30-2022 11:02 AM
Lastly, the output of the objective function might not be serializable by pickle. Double check that it is for sure a double that you are returning and not something more complex.
08-30-2022 03:27 PM
I changed the code in the way you were suggesting. It was not working with sparktrials(). It only worked with Trials() which I believe means there is no distributed tuning as we don't call distributed training algorithms such as MLlib or Horovod and we only call single-machine algorithms.
How we can make sure if using Trials means distributed tuning or not?
You can take a look at the notebook at this address:
08-31-2022 09:01 AM
Sorry, I do not have access authority to your workspace.
So you had in the objective function that it just returned a simple double - the accuracy - and it threw the exact same error? Then the question I have is what does the generator look like?
08-31-2022 10:21 AM
Using sparktrials I am receiving this error not the same error I was receiving before:
BadObjectiveFunction: When using `fmin` asynchronously, distributed algorithms or distributed objects may not be used within the objective function. This includes algorithms from Apache Spark ML and data objects like Spark DataFrames. In order to use Apache Spark in the objective function, use `Trials` instead of `SparkTrials`. To instead use `fmin` for single-machine ML like scikit-learn, make sure the objective function does not reference a Spark DataFrame or a distributed algorithm. See the following docs for more details on using Spark with Hyperopt: https://hyperopt.github.io/hyperopt/scaleout/spark
TypeError Traceback (most recent call last)
/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)
164 try:
--> 165 msg = pickler.dumps(domain)
166 except TypeError as e:
/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
TypeError: cannot pickle '_thread.RLock' object
During handling of the above exception, another exception occurred:
BadObjectiveFunction Traceback (most recent call last)
<command-1496814655941666> in <module>
----> 1 Hyperparameter_tuning(model_name)
<command-1496814655941665> in Hyperparameter_tuning(model_name)
2 with mlflow.start_run(run_name=model_name+"_Tuning"):
3 # mlflow.tensorflow.autolog()
----> 4 best_hyperparam = fmin(fn=CNN_HOF,
5 space=space,
6 algo=tpe.suggest,
/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
563
564 if allow_trials_fmin and hasattr(trials, "fmin"):
--> 565 return trials.fmin(
566 fn,
567 space,
/databricks/.python_edge_libs/hyperopt/instrumentation.py in instrumented(func, self, args, kwargs)
25 )
26 try:
---> 27 return_val = func(*args, **kwargs)
28 except Exception as exc:
29 error_string = "{} with message: {}".format(type(exc).__name__, str(exc))
/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
311 except BaseException as e:
312 logger.debug("fmin thread exits with an exception raised.")
--> 313 raise e
314 else:
315 logger.debug("fmin thread exits normally.")
/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
283 )
284
--> 285 res = fmin(
286 fn,
287 space,
/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
592 domain = base.Domain(fn, space, pass_expr_memo_ctrl=pass_expr_memo_ctrl)
593
--> 594 rval = FMinIter(
595 algo,
596 domain,
/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)
166 except TypeError as e:
167 if "cannot pickle '_thread.RLock' object" in str(e):
--> 168 raise BadObjectiveFunction(
169 "When using `fmin` asynchronously, distributed algorithms or "
170 "distributed objects may not be used within the objective function. "
When turning it to `Trials`, it is working but I doubt if it is distributed.
Image generator looks like this:
def img_generator(train_df,valid_df,test_df):
train_df_count = train_df.count()
result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":train_df_count }))
train_batch=result['batch']
train_step=result['step']
img_prep_function=None
if image_augmentation:
train_data_gen = ImageDataGenerator(rescale=1.0/255,
rotation_range=40,
width_shift_range=0.2,
height_shift_range=0.2,
shear_range=2.0,
zoom_range=0.2,
horizontal_flip=True,
fill_mode='nearest',
preprocessing_function=img_prep_function)
else:
train_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
train_generator = train_data_gen.flow_from_dataframe(dataframe=train_df.toPandas(),
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=train_batch)
valid_df_count = valid_df.count()
result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":valid_df_count }))
valid_batch=result['batch']
valid_step=result['step']
valid_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
valid_generator = valid_data_gen.flow_from_dataframe(dataframe=valid_df.toPandas(),
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=valid_batch,
shuffle=False,
seed=42)
test_df_count = test_df.count()
result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":test_df_count }))
test_batch=result['batch']
test_step=result['step']
test_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
test_generator = test_data_gen.flow_from_dataframe(dataframe=test_df.toPandas(),
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=test_batch,
shuffle=False,
seed=42)
return train_generator,train_step,train_batch,valid_generator,valid_step,test_generator,test_step
09-01-2022 09:53 AM
Try the below:
def CNN_HOF(train_df_pd, valid_df_pd, test_df_pd, params): #Hyperopt objective function
train_generator = train_data_gen.flow_from_dataframe(dataframe=train_df_pd,
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=train_batch)
valid_generator = valid_data_gen.flow_from_dataframe(dataframe=valid_df_pd,
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=valid_batch,
shuffle=False,
seed=42)
test_generator = test_data_gen.flow_from_dataframe(dataframe=test_df_pd,
directory=images_dir,
x_col='filename',
y_col=target,
target_size=(150, 150),
class_mode='categorical',
batch_size=test_batch,
shuffle=False,
seed=42)
mlflow.tensorflow.autolog()
model = model_builder(params,dense_size)
model.compile(loss="categorical_crossentropy",
optimizer=Adam(),
metrics=["accuracy"])
history = model.fit(train_generator,
steps_per_epoch=train_step,
epochs=tuner_epochs,
validation_data=valid_generator,
validation_steps=valid_step,
verbose=2)
# Evaluate the model
score = model.evaluate(test_generator, steps=1, verbose=0)
obj_metric = score[0]
return float(obj_metric)
I'm assuming your dense_size, valid_batch, train_batch, test_batch, image_dir, and target are global variables. Note that train_df_pd = train_df.toPandas(), valid_df_pd = valid_df.toPandas(), and test_df_pd = test_df.toPandas() are outside of the objective function and the pandas dataframes are being brought in as arguments. Then you put the generator in the objective function in here. If this works, then the generator was the issue and we can do something to speed up this process.
I also want to note that I am returning the score as a float. I'm wondering if that score[0] is a dictionary and not a float but I casted it but you can print that type to validate.
09-02-2022 01:11 PM
Yes, all those are global variables.
This code was not working for me, but it gave me the clue. I changed spark dataframes to pandas outside of the image generators and then I moved Image generators inside of the objective function. This way there was no need to pass the dataframes as arguments to the objective function. It is now working with sparktrials parallelism 🙂
As for the score, it is a list of scalars and score[0] is our test_loss which is a float. What did you receive when you cast it?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group