cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to set sparkTrials? I am receiving this TypeError: cannot pickle '_thread.lock' object

Somi
New Contributor III

I am trying to distribute hyperparameter tuning using hyperopt on a tensorflow.keras model. I am using sparkTrials in my fmin:

spark_trials = SparkTrials(parallelism=4)

...

best_hyperparam = fmin(fn=CNN_HOF, 

                 space=space, 

                 algo=tpe.suggest, 

                 max_evals=tuner_max_evals,

                  trials=spark_trials)

but I am receiving this error:

TypeError: cannot pickle '_thread.lock' object

the only way the code is working is skipping the trials passing by commenting out the line trials=spark_trials which means there would be no distributed tuning.

Any idea how can I fix this?

@Tian Tan​ 

@Sara Dooley​ 

1 ACCEPTED SOLUTION

Accepted Solutions

Dooley
Valued Contributor

Try the below:

def CNN_HOF(train_df_pd, valid_df_pd, test_df_pd, params): #Hyperopt objective function
    train_generator = train_data_gen.flow_from_dataframe(dataframe=train_df_pd,
                                                         directory=images_dir,
                                                         x_col='filename',
                                                         y_col=target,
                                                         target_size=(150, 150),
                                                         class_mode='categorical',
                                                         batch_size=train_batch)
     valid_generator = valid_data_gen.flow_from_dataframe(dataframe=valid_df_pd,
                                                         directory=images_dir,
                                                         x_col='filename',
                                                         y_col=target,
                                                         target_size=(150, 150),
                                                         class_mode='categorical',
                                                         batch_size=valid_batch,
                                                         shuffle=False,
                                                         seed=42)
    test_generator = test_data_gen.flow_from_dataframe(dataframe=test_df_pd,
                                                           directory=images_dir,
                                                           x_col='filename',
                                                           y_col=target,
                                                           target_size=(150, 150),
                                                           class_mode='categorical',
                                                           batch_size=test_batch,
                                                           shuffle=False,
                                                           seed=42)
    mlflow.tensorflow.autolog()
    model = model_builder(params,dense_size)
    model.compile(loss="categorical_crossentropy",
                optimizer=Adam(),
                metrics=["accuracy"])
 
    history = model.fit(train_generator,
                        steps_per_epoch=train_step,
                        epochs=tuner_epochs,
                        validation_data=valid_generator,
                        validation_steps=valid_step,
                        verbose=2)
  # Evaluate the model
    score = model.evaluate(test_generator, steps=1, verbose=0)
    obj_metric = score[0]
    return float(obj_metric)

I'm assuming your dense_size, valid_batch, train_batch, test_batch, image_dir, and target are global variables. Note that train_df_pd = train_df.toPandas(), valid_df_pd = valid_df.toPandas(), and test_df_pd = test_df.toPandas() are outside of the objective function and the pandas dataframes are being brought in as arguments. Then you put the generator in the objective function in here. If this works, then the generator was the issue and we can do something to speed up this process.

I also want to note that I am returning the score as a float. I'm wondering if that score[0] is a dictionary and not a float but I casted it but you can print that type to validate.

View solution in original post

10 REPLIES 10

Dooley
Valued Contributor

This can happen when you try to serialize a keras model with an unserializable layer. What does your model look like? Also what is in that search space variable? What are you trying to optimize on?

Somi
New Contributor III

This is more code and details:

space = {
      "pool_1": hp.choice('pool_1',np.arange(2, 5,1, dtype=int)),
      "conv_1": hp.choice('conv_1', np.arange(16, 128, 16,dtype=int)),
      "conv_1b": hp.choice('conv_1b', np.arange(2, 5, 1,dtype=int)),
 
      "pool_2": hp.choice('pool_2',np.arange(2, 5,1, dtype=int)), 
      "reg_2" : hp.choice('reg_2', np.arange(0.00005, 0.01, 0.00001, dtype=float)), 
      "conv_2": hp.choice("conv_2", np.arange(16, 128, 16, dtype=int)),
      "conv_2b": hp.choice("conv_2b", np.arange(2, 5, 1, dtype=int)),
 
      "pool_3": hp.choice('pool_3', np.arange(2, 5, 1, dtype=int)),  
      "reg_3" : hp.choice('reg_3', np.arange(0.00005, 0.01, 0.00001, dtype=float)), 
      "conv_3": hp.choice("conv_3", np.arange(16, 128, 16, dtype=int)),
      "conv_3b": hp.choice("conv_3b", np.arange(2, 5, 1, dtype=int)),
 
      "pool_4" : hp.choice('pool_4', np.arange(2, 5, 1, dtype=int)),   
      "reg_4" : hp.choice('reg_4', np.arange(0.00005, 0.01, 0.00001, dtype=float)),
      "conv_4": hp.choice("conv_4", np.arange(16, 128, 16, dtype=int)),
      "conv_4b": hp.choice("conv_4b",np.arange(2, 5, 1, dtype=int)),
      "drop_4": hp.choice('drop_4', np.arange(0.00005, 0.01, 0.00001, dtype=float)) 
    }

def model_builder(params,dense_size): #CNN builder function
    model = Sequential()
    model.add(Conv2D(int(params['conv_1']), (int(params['conv_1b']), int(params['conv_1b'])), activation='relu', input_shape=(150, 150, 3)))
    model.add(MaxPooling2D(int(params['pool_1']), int(params['pool_1'])))
    
    model.add(Conv2D(int(params['conv_2']), (int(params['conv_2b']), int(params['conv_2b'])), activation='relu', kernel_regularizer=L1L2(float(params['reg_2']), float(params['reg_2']))))
    model.add(MaxPooling2D(int(params['pool_2']), int(params['pool_2'])))
    
    model.add(Conv2D(int(params['conv_3']), (int(params['conv_3b']), int(params['conv_3b'])), activation='relu', kernel_regularizer=L1L2(float(params['reg_3']), float(params['reg_3']))))
    model.add(MaxPooling2D(int(params['pool_3']), int(params['pool_3'])))
    
    model.add(Conv2D(int(params['conv_4']), (int(params['conv_4b']), int(params['conv_4b'])), activation='relu', kernel_regularizer=L1L2(float(params['reg_4']), float(params['reg_4']))))
    model.add(MaxPooling2D(int(params['pool_4']), int(params['pool_4'])))
    
    model.add(Dropout(float(params['drop_4'])))
    
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dense(dense_size, activation='softmax'))
 
    return model

def CNN_HOF(params): #Hyperopt objective function
    mlflow.tensorflow.autolog()
    model = model_builder(params,dense_size)
    model.compile(loss="categorical_crossentropy",
                optimizer=Adam(),
                metrics=["accuracy"])
 
    history = model.fit(train_generator,
                        steps_per_epoch=train_step,
                        epochs=tuner_epochs,
                        validation_data=valid_generator,
                        validation_steps=valid_step,
                        verbose=2)
  # Evaluate the model
    score = model.evaluate(test_generator, steps=1, verbose=0)
    obj_metric = score[0]
    return {"loss": obj_metric, "status": STATUS_OK}
 
 
spark_trials = SparkTrials(parallelism=4)
...
with mlflow.start_run(run_name=model_name+"_Tuning"):
        best_hyperparam = fmin(fn=CNN_HOF,
                                 space=space,
                                 algo=tpe.suggest,
                                 max_evals=tuner_max_evals,
                                 early_stop_fn=no_progress_loss(10),
                                    trials=spark_trials)

This is the complete error message:

TypeError Traceback (most recent call last)

<command-2155252138731800> in <module>

1 if tuning:

----> 2 Hyperparameter_tuning(model_name)

<command-3238776031025884> in Hyperparameter_tuning(model_name)

2 with mlflow.start_run(run_name=model_name+"_Tuning"):

3 # mlflow.tensorflow.autolog()

----> 4 best_hyperparam = fmin(fn=CNN_HOF,

5 space=space,

6 algo=tpe.suggest,

/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)

563

564 if allow_trials_fmin and hasattr(trials, "fmin"):

--> 565 return trials.fmin(

566 fn,

567 space,

/databricks/.python_edge_libs/hyperopt/instrumentation.py in instrumented(func, self, args, kwargs)

25 )

26 try:

---> 27 return_val = func(*args, **kwargs)

28 except Exception as exc:

29 error_string = "{} with message: {}".format(type(exc).__name__, str(exc))

/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)

311 except BaseException as e:

312 logger.debug("fmin thread exits with an exception raised.")

--> 313 raise e

314 else:

315 logger.debug("fmin thread exits normally.")

/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)

283 )

284

--> 285 res = fmin(

286 fn,

287 space,

/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)

592 domain = base.Domain(fn, space, pass_expr_memo_ctrl=pass_expr_memo_ctrl)

593

--> 594 rval = FMinIter(

595 algo,

596 domain,

/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)

180 )

181 else:

--> 182 raise e

183 trials.attachments["FMinIter_Domain"] = msg

184

/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)

163 logger.warning("over-writing old domain trials attachment")

164 try:

--> 165 msg = pickler.dumps(domain)

166 except TypeError as e:

167 if "cannot pickle '_thread.RLock' object" in str(e):

/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)

71 file, protocol=protocol, buffer_callback=buffer_callback

72 )

---> 73 cp.dump(obj)

74 return file.getvalue()

75

/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)

561 def dump(self, obj):

562 try:

--> 563 return Pickler.dump(self, obj)

564 except RuntimeError as e:

565 if "recursion" in e.args[0]:

TypeError: cannot pickle '_thread.lock' object

Somi
New Contributor III

This is a summary of the cluster:

2-8 Workers

32-128 GB Memory

8-32 Cores

1 Driver

16 GB Memory, 4 Cores

Runtime

10.5.x-gpu-ml-scala2.12 

(3) what is the value set for tuner_max_evals? Right now it's been set to 36, but with every value of this, we receive the error.

Dooley
Valued Contributor

From looking through your full error, it looks like the error is first with FMinIter before it tries to do a Pickler dump after that error which is what is throwing that specific error. I think the true error we are chasing here is the FMinIter error.

First, try to test the search space on something simpler (maybe only trying it only pool for one layer at first). Additionally, I would use hp.quniform with min >>= 1 such as this:

search_space = {
    "pool_1": hp.quniform("pool_1", 2, 5, 1),
}

Another thought is that you need to load your train_generator & test_generator in the objective function which I'm assuming could look like this:

train_generator = datagen.flow_from_directory(directory="/image/path", 
                                              class_mode="binary", 
                                              classes=["normal", "abnormal"],
                                              batch_size=batch_size,
                                              target_size=(img_height, img_width))

Dooley
Valued Contributor

Lastly, the output of the objective function might not be serializable by pickle. Double check that it is for sure a double that you are returning and not something more complex.

Somi
New Contributor III

I changed the code in the way you were suggesting. It was not working with sparktrials(). It only worked with Trials() which I believe means there is no distributed tuning as we don't call distributed training algorithms such as MLlib or Horovod and we only call single-machine algorithms.

How we can make sure if using Trials means distributed tuning or not?

You can take a look at the notebook at this address:

https://dbc-1dfc249d-eec7.cloud.databricks.com/?o=3298945606027707#notebook/1496814655941658/command...

Dooley
Valued Contributor

Sorry, I do not have access authority to your workspace.

So you had in the objective function that it just returned a simple double - the accuracy - and it threw the exact same error? Then the question I have is what does the generator look like?

Somi
New Contributor III

Using sparktrials I am receiving this error not the same error I was receiving before:

BadObjectiveFunction: When using `fmin` asynchronously, distributed algorithms or distributed objects may not be used within the objective function. This includes algorithms from Apache Spark ML and data objects like Spark DataFrames. In order to use Apache Spark in the objective function, use `Trials` instead of `SparkTrials`. To instead use `fmin` for single-machine ML like scikit-learn, make sure the objective function does not reference a Spark DataFrame or a distributed algorithm. See the following docs for more details on using Spark with Hyperopt: https://hyperopt.github.io/hyperopt/scaleout/spark

 
TypeError                                 Traceback (most recent call last)
/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)
    164             try:
--> 165                 msg = pickler.dumps(domain)
    166             except TypeError as e:
 
/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()
 
/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    562         try:
--> 563             return Pickler.dump(self, obj)
    564         except RuntimeError as e:
 
TypeError: cannot pickle '_thread.RLock' object
 
During handling of the above exception, another exception occurred:
 
BadObjectiveFunction                      Traceback (most recent call last)
<command-1496814655941666> in <module>
----> 1 Hyperparameter_tuning(model_name)
 
<command-1496814655941665> in Hyperparameter_tuning(model_name)
      2     with mlflow.start_run(run_name=model_name+"_Tuning"):
      3 #         mlflow.tensorflow.autolog()
----> 4         best_hyperparam = fmin(fn=CNN_HOF, 
      5                                  space=space,
      6                                  algo=tpe.suggest,
 
/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
    563 
    564     if allow_trials_fmin and hasattr(trials, "fmin"):
--> 565         return trials.fmin(
    566             fn,
    567             space,
 
/databricks/.python_edge_libs/hyperopt/instrumentation.py in instrumented(func, self, args, kwargs)
     25     )
     26     try:
---> 27         return_val = func(*args, **kwargs)
     28     except Exception as exc:
     29         error_string = "{} with message: {}".format(type(exc).__name__, str(exc))
 
/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
    311         except BaseException as e:
    312             logger.debug("fmin thread exits with an exception raised.")
--> 313             raise e
    314         else:
    315             logger.debug("fmin thread exits normally.")
 
/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
    283             )
    284 
--> 285             res = fmin(
    286                 fn,
    287                 space,
 
/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
    592     domain = base.Domain(fn, space, pass_expr_memo_ctrl=pass_expr_memo_ctrl)
    593 
--> 594     rval = FMinIter(
    595         algo,
    596         domain,
 
/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)
    166             except TypeError as e:
    167                 if "cannot pickle '_thread.RLock' object" in str(e):
--> 168                     raise BadObjectiveFunction(
    169                         "When using `fmin` asynchronously, distributed algorithms or "
    170                         "distributed objects may not be used within the objective function. "

 When turning it to `Trials`, it is working but I doubt if it is distributed.

Image generator looks like this:

def img_generator(train_df,valid_df,test_df):
    train_df_count = train_df.count()
    result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":train_df_count }))
    train_batch=result['batch']
    train_step=result['step']
        
    img_prep_function=None
 
    if image_augmentation:
        train_data_gen = ImageDataGenerator(rescale=1.0/255,
                                            rotation_range=40,
                                            width_shift_range=0.2,
                                            height_shift_range=0.2,
                                            shear_range=2.0,
                                            zoom_range=0.2,
                                            horizontal_flip=True,
                                            fill_mode='nearest',
                                            preprocessing_function=img_prep_function)
    else:
        train_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
 
 
    train_generator = train_data_gen.flow_from_dataframe(dataframe=train_df.toPandas(),
                                                         directory=images_dir,
                                                         x_col='filename',
                                                         y_col=target,
                                                         target_size=(150, 150),
                                                         class_mode='categorical',
                                                         batch_size=train_batch)
    valid_df_count = valid_df.count()
    result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":valid_df_count }))
    valid_batch=result['batch']
    valid_step=result['step']
 
    valid_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
    valid_generator = valid_data_gen.flow_from_dataframe(dataframe=valid_df.toPandas(),
                                                         directory=images_dir,
                                                         x_col='filename',
                                                         y_col=target,
                                                         target_size=(150, 150),
                                                         class_mode='categorical',
                                                         batch_size=valid_batch,
                                                         shuffle=False,
                                                         seed=42)
 
 
 
    test_df_count = test_df.count()
    result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":test_df_count }))
    test_batch=result['batch']
    test_step=result['step']
 
 
    test_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
    test_generator = test_data_gen.flow_from_dataframe(dataframe=test_df.toPandas(),
                                                           directory=images_dir,
                                                           x_col='filename',
                                                           y_col=target,
                                                           target_size=(150, 150),
                                                           class_mode='categorical',
                                                           batch_size=test_batch,
                                                           shuffle=False,
                                                           seed=42)
    return train_generator,train_step,train_batch,valid_generator,valid_step,test_generator,test_step

Dooley
Valued Contributor

Try the below:

def CNN_HOF(train_df_pd, valid_df_pd, test_df_pd, params): #Hyperopt objective function
    train_generator = train_data_gen.flow_from_dataframe(dataframe=train_df_pd,
                                                         directory=images_dir,
                                                         x_col='filename',
                                                         y_col=target,
                                                         target_size=(150, 150),
                                                         class_mode='categorical',
                                                         batch_size=train_batch)
     valid_generator = valid_data_gen.flow_from_dataframe(dataframe=valid_df_pd,
                                                         directory=images_dir,
                                                         x_col='filename',
                                                         y_col=target,
                                                         target_size=(150, 150),
                                                         class_mode='categorical',
                                                         batch_size=valid_batch,
                                                         shuffle=False,
                                                         seed=42)
    test_generator = test_data_gen.flow_from_dataframe(dataframe=test_df_pd,
                                                           directory=images_dir,
                                                           x_col='filename',
                                                           y_col=target,
                                                           target_size=(150, 150),
                                                           class_mode='categorical',
                                                           batch_size=test_batch,
                                                           shuffle=False,
                                                           seed=42)
    mlflow.tensorflow.autolog()
    model = model_builder(params,dense_size)
    model.compile(loss="categorical_crossentropy",
                optimizer=Adam(),
                metrics=["accuracy"])
 
    history = model.fit(train_generator,
                        steps_per_epoch=train_step,
                        epochs=tuner_epochs,
                        validation_data=valid_generator,
                        validation_steps=valid_step,
                        verbose=2)
  # Evaluate the model
    score = model.evaluate(test_generator, steps=1, verbose=0)
    obj_metric = score[0]
    return float(obj_metric)

I'm assuming your dense_size, valid_batch, train_batch, test_batch, image_dir, and target are global variables. Note that train_df_pd = train_df.toPandas(), valid_df_pd = valid_df.toPandas(), and test_df_pd = test_df.toPandas() are outside of the objective function and the pandas dataframes are being brought in as arguments. Then you put the generator in the objective function in here. If this works, then the generator was the issue and we can do something to speed up this process.

I also want to note that I am returning the score as a float. I'm wondering if that score[0] is a dictionary and not a float but I casted it but you can print that type to validate.

Somi
New Contributor III

Yes, all those are global variables.

This code was not working for me, but it gave me the clue. I changed spark dataframes to pandas outside of the image generators and then I moved Image generators inside of the objective function. This way there was no need to pass the dataframes as arguments to the objective function. It is now working with sparktrials parallelism 🙂

As for the score, it is a list of scalars and score[0] is our test_loss which is a float. What did you receive when you cast it?

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!