Somi
New Contributor III

Using sparktrials I am receiving this error not the same error I was receiving before:

BadObjectiveFunction: When using `fmin` asynchronously, distributed algorithms or distributed objects may not be used within the objective function. This includes algorithms from Apache Spark ML and data objects like Spark DataFrames. In order to use Apache Spark in the objective function, use `Trials` instead of `SparkTrials`. To instead use `fmin` for single-machine ML like scikit-learn, make sure the objective function does not reference a Spark DataFrame or a distributed algorithm. See the following docs for more details on using Spark with Hyperopt: https://hyperopt.github.io/hyperopt/scaleout/spark

 
TypeError                                 Traceback (most recent call last)
/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)
    164             try:
--> 165                 msg = pickler.dumps(domain)
    166             except TypeError as e:
 
/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()
 
/databricks/python/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    562         try:
--> 563             return Pickler.dump(self, obj)
    564         except RuntimeError as e:
 
TypeError: cannot pickle '_thread.RLock' object
 
During handling of the above exception, another exception occurred:
 
BadObjectiveFunction                      Traceback (most recent call last)
<command-1496814655941666> in <module>
----> 1 Hyperparameter_tuning(model_name)
 
<command-1496814655941665> in Hyperparameter_tuning(model_name)
      2     with mlflow.start_run(run_name=model_name+"_Tuning"):
      3 #         mlflow.tensorflow.autolog()
----> 4         best_hyperparam = fmin(fn=CNN_HOF, 
      5                                  space=space,
      6                                  algo=tpe.suggest,
 
/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
    563 
    564     if allow_trials_fmin and hasattr(trials, "fmin"):
--> 565         return trials.fmin(
    566             fn,
    567             space,
 
/databricks/.python_edge_libs/hyperopt/instrumentation.py in instrumented(func, self, args, kwargs)
     25     )
     26     try:
---> 27         return_val = func(*args, **kwargs)
     28     except Exception as exc:
     29         error_string = "{} with message: {}".format(type(exc).__name__, str(exc))
 
/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
    311         except BaseException as e:
    312             logger.debug("fmin thread exits with an exception raised.")
--> 313             raise e
    314         else:
    315             logger.debug("fmin thread exits normally.")
 
/databricks/.python_edge_libs/hyperopt/spark.py in fmin(self, fn, space, algo, max_evals, timeout, loss_threshold, max_queue_len, rstate, verbose, pass_expr_memo_ctrl, catch_eval_exceptions, return_argmin, show_progressbar, early_stop_fn, trials_save_file)
    283             )
    284 
--> 285             res = fmin(
    286                 fn,
    287                 space,
 
/databricks/.python_edge_libs/hyperopt/fmin.py in fmin(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctrl, catch_eval_exceptions, verbose, return_argmin, points_to_evaluate, max_queue_len, show_progressbar, early_stop_fn, trials_save_file)
    592     domain = base.Domain(fn, space, pass_expr_memo_ctrl=pass_expr_memo_ctrl)
    593 
--> 594     rval = FMinIter(
    595         algo,
    596         domain,
 
/databricks/.python_edge_libs/hyperopt/fmin.py in __init__(self, algo, domain, trials, rstate, asynchronous, max_queue_len, poll_interval_secs, max_evals, timeout, loss_threshold, verbose, show_progressbar, early_stop_fn, trials_save_file)
    166             except TypeError as e:
    167                 if "cannot pickle '_thread.RLock' object" in str(e):
--> 168                     raise BadObjectiveFunction(
    169                         "When using `fmin` asynchronously, distributed algorithms or "
    170                         "distributed objects may not be used within the objective function. "

 When turning it to `Trials`, it is working but I doubt if it is distributed.

Image generator looks like this:

def img_generator(train_df,valid_df,test_df):
    train_df_count = train_df.count()
    result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":train_df_count }))
    train_batch=result['batch']
    train_step=result['step']
        
    img_prep_function=None
 
    if image_augmentation:
        train_data_gen = ImageDataGenerator(rescale=1.0/255,
                                            rotation_range=40,
                                            width_shift_range=0.2,
                                            height_shift_range=0.2,
                                            shear_range=2.0,
                                            zoom_range=0.2,
                                            horizontal_flip=True,
                                            fill_mode='nearest',
                                            preprocessing_function=img_prep_function)
    else:
        train_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
 
 
    train_generator = train_data_gen.flow_from_dataframe(dataframe=train_df.toPandas(),
                                                         directory=images_dir,
                                                         x_col='filename',
                                                         y_col=target,
                                                         target_size=(150, 150),
                                                         class_mode='categorical',
                                                         batch_size=train_batch)
    valid_df_count = valid_df.count()
    result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":valid_df_count }))
    valid_batch=result['batch']
    valid_step=result['step']
 
    valid_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
    valid_generator = valid_data_gen.flow_from_dataframe(dataframe=valid_df.toPandas(),
                                                         directory=images_dir,
                                                         x_col='filename',
                                                         y_col=target,
                                                         target_size=(150, 150),
                                                         class_mode='categorical',
                                                         batch_size=valid_batch,
                                                         shuffle=False,
                                                         seed=42)
 
 
 
    test_df_count = test_df.count()
    result= json.loads(dbutils.notebook.run("Batch_step_size", 3600,{"dataframe_count":test_df_count }))
    test_batch=result['batch']
    test_step=result['step']
 
 
    test_data_gen = ImageDataGenerator(rescale=1.0/255, preprocessing_function=img_prep_function)
    test_generator = test_data_gen.flow_from_dataframe(dataframe=test_df.toPandas(),
                                                           directory=images_dir,
                                                           x_col='filename',
                                                           y_col=target,
                                                           target_size=(150, 150),
                                                           class_mode='categorical',
                                                           batch_size=test_batch,
                                                           shuffle=False,
                                                           seed=42)
    return train_generator,train_step,train_batch,valid_generator,valid_step,test_generator,test_step