cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

How to solve cluster break down due to GC when training a pyspark.ml Random Forest

llvu
New Contributor III

I am trying to train and optimize a random forest. At first the cluster handles the garbage collection fine, but after a couple of hours the cluster breaks down as Garbage Collection has gone up significantly.

The train_df has a size of 6,365,018 records with 31 number of columns. Before splitting data_df into train and test dataframes (all are spark dataframes) I write them to a checkpoint location, optimize the number of partitions with spark.sql("""optimize delta.`location`""") and read in the dataframe again.

Could someone help me with ways to optimize Garbage Collection even further? I can think of reducing the max_depth parameter, but am struggling to fully understand the problem.

# Feature transforming
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer, VectorAssembler
 
# Model and Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
 
num_partitions = data_df.rdd.getNumPartitions()
 
parallelism = np.floor((n_cores-16)/num_partitions).astype(int)
 
train_df, test_df = data_df.randomSplit([.8, .2], seed=42)
 
index_output_cols = [x + "Index" for x in categorical_cols]
 
string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")
 
numeric_cols = [field for (field, dataType) in train_df.dtypes if (((dataType == "double") |("int" in dataType)) & (field != "label"))]
assembler_inputs = index_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
 
scaler = MinMaxScaler(inputCol="features", outputCol="ScaledFeatures")
 
stages = [string_indexer, vec_assembler, scaler]
pipeline = Pipeline(stages=stages)
 
pipeline_data = pipeline.fit(train_df)
scaled_train_data = pipeline_data.transform(train_df).cache()
scaled_test_data = pipeline_data.transform(test_df).cache()
 
max_depth_choices = [5, 10, 20]
n_estimator_choices = [80, 100, 120]
subset_choices = ['0.1', '0.3', '0.5']
impurtiy_choices = ['gini', 'entropy']
 
evaluator= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName='f1')
 
classifier = RandomForestClassifier(featuresCol="ScaledFeatures", labelCol="label", seed=42, maxBins=max_cat_size)
 
#Create grid that works with Spark Random Forest
param_grid = (ParamGridBuilder()
              .addGrid(classifier.maxDepth, max_depth_choices)
              .addGrid(classifier.numTrees, n_estimator_choices)
              .addGrid(classifier.featureSubsetStrategy, subset_choices)
              .addGrid(classifier.impurity, impurtiy_choices)
              .build()
             )
 
with mlflow.start_run(nested=True, run_name='CrossValidator'):
    mlflow.autolog(log_models=True, log_model_signatures=False)
    cv = CrossValidator(estimator=classifier, evaluator=evaluator, estimatorParamMaps=param_grid, 
                        numFolds=3, seed=42, parallelism=parallelism)
    cv_model = cv.fit(scaled_train_data)
    mlflow.log_param("best_train", np.mean(cv_model.avgMetrics))

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

The cache is expensive and wants to save that data to memory and disk (id there is no more space left in memory). I know that, in theory, it should improve, but it can make things worse. I would just put

scaled_train_data = pipeline_data.transform(train_df)

scaled_test_data = pipeline_data.transform(test_df)

Then I would analyze the number and size of partitions of scaled_train_data and scaled_test_data and then repartition so each partition would be between 100 and 200 MB. The number of partitions I would set as a multiplier of cores on workers (so, for example, 16 cores on workers, and we have 64 partitions 150 MB each).

To analyze use:

df.rdd.getNumPartitions

df.rdd.partitions.length

df.rdd.partitions.size

To repartition use:

scaled_train_data = pipeline_data.transform(train_df).repartition(optimal_number)

scaled_test_data = pipeline_data.transform(test_df).repartition(optimal_number)

llvu
New Contributor III

Thank you for the fast reply, let me try this out!

I indeed was under the impression that caching the dataframe would improve performance instead of making it worse.

Would you happen to know why optimizing the saved tables does not give the optimal number of partitions? Or does it give the optimal number of partitions in terms of storage and not computational purposes?

Hubert-Dudek
Esteemed Contributor III

Yes, the optimization you mentioned is related to storage (so it speeds up loading from storage only before any transformations are made you need to manipulate portions which are create on cluster after transform() is made)

Kaniz
Community Manager
Community Manager

Hi @Liselotte van Unen​(Customer)​ , We haven’t heard from you since the last response from @Hubert Dudek​, and I was checking back to see if his suggestions helped you.

Or else, If you have any solution, please do share that with the community as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.