I am trying to train and optimize a random forest. At first the cluster handles the garbage collection fine, but after a couple of hours the cluster breaks down as Garbage Collection has gone up significantly.
The train_df has a size of 6,365,018 records with 31 number of columns. Before splitting data_df into train and test dataframes (all are spark dataframes) I write them to a checkpoint location, optimize the number of partitions with spark.sql("""optimize delta.`location`""") and read in the dataframe again.
Could someone help me with ways to optimize Garbage Collection even further? I can think of reducing the max_depth parameter, but am struggling to fully understand the problem.
# Feature transforming
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Model and Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
num_partitions = data_df.rdd.getNumPartitions()
parallelism = np.floor((n_cores-16)/num_partitions).astype(int)
train_df, test_df = data_df.randomSplit([.8, .2], seed=42)
index_output_cols = [x + "Index" for x in categorical_cols]
string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")
numeric_cols = [field for (field, dataType) in train_df.dtypes if (((dataType == "double") |("int" in dataType)) & (field != "label"))]
assembler_inputs = index_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
scaler = MinMaxScaler(inputCol="features", outputCol="ScaledFeatures")
stages = [string_indexer, vec_assembler, scaler]
pipeline = Pipeline(stages=stages)
pipeline_data = pipeline.fit(train_df)
scaled_train_data = pipeline_data.transform(train_df).cache()
scaled_test_data = pipeline_data.transform(test_df).cache()
max_depth_choices = [5, 10, 20]
n_estimator_choices = [80, 100, 120]
subset_choices = ['0.1', '0.3', '0.5']
impurtiy_choices = ['gini', 'entropy']
evaluator= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName='f1')
classifier = RandomForestClassifier(featuresCol="ScaledFeatures", labelCol="label", seed=42, maxBins=max_cat_size)
#Create grid that works with Spark Random Forest
param_grid = (ParamGridBuilder()
.addGrid(classifier.maxDepth, max_depth_choices)
.addGrid(classifier.numTrees, n_estimator_choices)
.addGrid(classifier.featureSubsetStrategy, subset_choices)
.addGrid(classifier.impurity, impurtiy_choices)
.build()
)
with mlflow.start_run(nested=True, run_name='CrossValidator'):
mlflow.autolog(log_models=True, log_model_signatures=False)
cv = CrossValidator(estimator=classifier, evaluator=evaluator, estimatorParamMaps=param_grid,
numFolds=3, seed=42, parallelism=parallelism)
cv_model = cv.fit(scaled_train_data)
mlflow.log_param("best_train", np.mean(cv_model.avgMetrics))