cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to solve cluster break down due to GC when training a pyspark.ml Random Forest

llvu
New Contributor III

I am trying to train and optimize a random forest. At first the cluster handles the garbage collection fine, but after a couple of hours the cluster breaks down as Garbage Collection has gone up significantly.

The train_df has a size of 6,365,018 records with 31 number of columns. Before splitting data_df into train and test dataframes (all are spark dataframes) I write them to a checkpoint location, optimize the number of partitions with spark.sql("""optimize delta.`location`""") and read in the dataframe again.

Could someone help me with ways to optimize Garbage Collection even further? I can think of reducing the max_depth parameter, but am struggling to fully understand the problem.

# Feature transforming
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer, VectorAssembler
 
# Model and Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
 
num_partitions = data_df.rdd.getNumPartitions()
 
parallelism = np.floor((n_cores-16)/num_partitions).astype(int)
 
train_df, test_df = data_df.randomSplit([.8, .2], seed=42)
 
index_output_cols = [x + "Index" for x in categorical_cols]
 
string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")
 
numeric_cols = [field for (field, dataType) in train_df.dtypes if (((dataType == "double") |("int" in dataType)) & (field != "label"))]
assembler_inputs = index_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
 
scaler = MinMaxScaler(inputCol="features", outputCol="ScaledFeatures")
 
stages = [string_indexer, vec_assembler, scaler]
pipeline = Pipeline(stages=stages)
 
pipeline_data = pipeline.fit(train_df)
scaled_train_data = pipeline_data.transform(train_df).cache()
scaled_test_data = pipeline_data.transform(test_df).cache()
 
max_depth_choices = [5, 10, 20]
n_estimator_choices = [80, 100, 120]
subset_choices = ['0.1', '0.3', '0.5']
impurtiy_choices = ['gini', 'entropy']
 
evaluator= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName='f1')
 
classifier = RandomForestClassifier(featuresCol="ScaledFeatures", labelCol="label", seed=42, maxBins=max_cat_size)
 
#Create grid that works with Spark Random Forest
param_grid = (ParamGridBuilder()
              .addGrid(classifier.maxDepth, max_depth_choices)
              .addGrid(classifier.numTrees, n_estimator_choices)
              .addGrid(classifier.featureSubsetStrategy, subset_choices)
              .addGrid(classifier.impurity, impurtiy_choices)
              .build()
             )
 
with mlflow.start_run(nested=True, run_name='CrossValidator'):
    mlflow.autolog(log_models=True, log_model_signatures=False)
    cv = CrossValidator(estimator=classifier, evaluator=evaluator, estimatorParamMaps=param_grid, 
                        numFolds=3, seed=42, parallelism=parallelism)
    cv_model = cv.fit(scaled_train_data)
    mlflow.log_param("best_train", np.mean(cv_model.avgMetrics))

3 REPLIES 3

Hubert-Dudek
Esteemed Contributor III

The cache is expensive and wants to save that data to memory and disk (id there is no more space left in memory). I know that, in theory, it should improve, but it can make things worse. I would just put

scaled_train_data = pipeline_data.transform(train_df)

scaled_test_data = pipeline_data.transform(test_df)

Then I would analyze the number and size of partitions of scaled_train_data and scaled_test_data and then repartition so each partition would be between 100 and 200 MB. The number of partitions I would set as a multiplier of cores on workers (so, for example, 16 cores on workers, and we have 64 partitions 150 MB each).

To analyze use:

df.rdd.getNumPartitions

df.rdd.partitions.length

df.rdd.partitions.size

To repartition use:

scaled_train_data = pipeline_data.transform(train_df).repartition(optimal_number)

scaled_test_data = pipeline_data.transform(test_df).repartition(optimal_number)

llvu
New Contributor III

Thank you for the fast reply, let me try this out!

I indeed was under the impression that caching the dataframe would improve performance instead of making it worse.

Would you happen to know why optimizing the saved tables does not give the optimal number of partitions? Or does it give the optimal number of partitions in terms of storage and not computational purposes?

Hubert-Dudek
Esteemed Contributor III

Yes, the optimization you mentioned is related to storage (so it speeds up loading from storage only before any transformations are made you need to manipulate portions which are create on cluster after transform() is made)

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group