How to solve cluster break down due to GC when training a pyspark.ml Random Forest

llvu
New Contributor III

I am trying to train and optimize a random forest. At first the cluster handles the garbage collection fine, but after a couple of hours the cluster breaks down as Garbage Collection has gone up significantly.

The train_df has a size of 6,365,018 records with 31 number of columns. Before splitting data_df into train and test dataframes (all are spark dataframes) I write them to a checkpoint location, optimize the number of partitions with spark.sql("""optimize delta.`location`""") and read in the dataframe again.

Could someone help me with ways to optimize Garbage Collection even further? I can think of reducing the max_depth parameter, but am struggling to fully understand the problem.

# Feature transforming
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer, VectorAssembler
 
# Model and Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
 
num_partitions = data_df.rdd.getNumPartitions()
 
parallelism = np.floor((n_cores-16)/num_partitions).astype(int)
 
train_df, test_df = data_df.randomSplit([.8, .2], seed=42)
 
index_output_cols = [x + "Index" for x in categorical_cols]
 
string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")
 
numeric_cols = [field for (field, dataType) in train_df.dtypes if (((dataType == "double") |("int" in dataType)) & (field != "label"))]
assembler_inputs = index_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
 
scaler = MinMaxScaler(inputCol="features", outputCol="ScaledFeatures")
 
stages = [string_indexer, vec_assembler, scaler]
pipeline = Pipeline(stages=stages)
 
pipeline_data = pipeline.fit(train_df)
scaled_train_data = pipeline_data.transform(train_df).cache()
scaled_test_data = pipeline_data.transform(test_df).cache()
 
max_depth_choices = [5, 10, 20]
n_estimator_choices = [80, 100, 120]
subset_choices = ['0.1', '0.3', '0.5']
impurtiy_choices = ['gini', 'entropy']
 
evaluator= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName='f1')
 
classifier = RandomForestClassifier(featuresCol="ScaledFeatures", labelCol="label", seed=42, maxBins=max_cat_size)
 
#Create grid that works with Spark Random Forest
param_grid = (ParamGridBuilder()
              .addGrid(classifier.maxDepth, max_depth_choices)
              .addGrid(classifier.numTrees, n_estimator_choices)
              .addGrid(classifier.featureSubsetStrategy, subset_choices)
              .addGrid(classifier.impurity, impurtiy_choices)
              .build()
             )
 
with mlflow.start_run(nested=True, run_name='CrossValidator'):
    mlflow.autolog(log_models=True, log_model_signatures=False)
    cv = CrossValidator(estimator=classifier, evaluator=evaluator, estimatorParamMaps=param_grid, 
                        numFolds=3, seed=42, parallelism=parallelism)
    cv_model = cv.fit(scaled_train_data)
    mlflow.log_param("best_train", np.mean(cv_model.avgMetrics))