01-05-2023 02:14 AM
I am trying to train and optimize a random forest. At first the cluster handles the garbage collection fine, but after a couple of hours the cluster breaks down as Garbage Collection has gone up significantly.
The train_df has a size of 6,365,018 records with 31 number of columns. Before splitting data_df into train and test dataframes (all are spark dataframes) I write them to a checkpoint location, optimize the number of partitions with spark.sql("""optimize delta.`location`""") and read in the dataframe again.
Could someone help me with ways to optimize Garbage Collection even further? I can think of reducing the max_depth parameter, but am struggling to fully understand the problem.
# Feature transforming
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Model and Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
num_partitions = data_df.rdd.getNumPartitions()
parallelism = np.floor((n_cores-16)/num_partitions).astype(int)
train_df, test_df = data_df.randomSplit([.8, .2], seed=42)
index_output_cols = [x + "Index" for x in categorical_cols]
string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")
numeric_cols = [field for (field, dataType) in train_df.dtypes if (((dataType == "double") |("int" in dataType)) & (field != "label"))]
assembler_inputs = index_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
scaler = MinMaxScaler(inputCol="features", outputCol="ScaledFeatures")
stages = [string_indexer, vec_assembler, scaler]
pipeline = Pipeline(stages=stages)
pipeline_data = pipeline.fit(train_df)
scaled_train_data = pipeline_data.transform(train_df).cache()
scaled_test_data = pipeline_data.transform(test_df).cache()
max_depth_choices = [5, 10, 20]
n_estimator_choices = [80, 100, 120]
subset_choices = ['0.1', '0.3', '0.5']
impurtiy_choices = ['gini', 'entropy']
evaluator= MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName='f1')
classifier = RandomForestClassifier(featuresCol="ScaledFeatures", labelCol="label", seed=42, maxBins=max_cat_size)
#Create grid that works with Spark Random Forest
param_grid = (ParamGridBuilder()
.addGrid(classifier.maxDepth, max_depth_choices)
.addGrid(classifier.numTrees, n_estimator_choices)
.addGrid(classifier.featureSubsetStrategy, subset_choices)
.addGrid(classifier.impurity, impurtiy_choices)
.build()
)
with mlflow.start_run(nested=True, run_name='CrossValidator'):
mlflow.autolog(log_models=True, log_model_signatures=False)
cv = CrossValidator(estimator=classifier, evaluator=evaluator, estimatorParamMaps=param_grid,
numFolds=3, seed=42, parallelism=parallelism)
cv_model = cv.fit(scaled_train_data)
mlflow.log_param("best_train", np.mean(cv_model.avgMetrics))
01-05-2023 02:38 AM
The cache is expensive and wants to save that data to memory and disk (id there is no more space left in memory). I know that, in theory, it should improve, but it can make things worse. I would just put
scaled_train_data = pipeline_data.transform(train_df)
scaled_test_data = pipeline_data.transform(test_df)
Then I would analyze the number and size of partitions of scaled_train_data and scaled_test_data and then repartition so each partition would be between 100 and 200 MB. The number of partitions I would set as a multiplier of cores on workers (so, for example, 16 cores on workers, and we have 64 partitions 150 MB each).
To analyze use:
df.rdd.getNumPartitions
df.rdd.partitions.length
df.rdd.partitions.size
To repartition use:
scaled_train_data = pipeline_data.transform(train_df).repartition(optimal_number)
scaled_test_data = pipeline_data.transform(test_df).repartition(optimal_number)
01-05-2023 02:45 AM
Thank you for the fast reply, let me try this out!
I indeed was under the impression that caching the dataframe would improve performance instead of making it worse.
Would you happen to know why optimizing the saved tables does not give the optimal number of partitions? Or does it give the optimal number of partitions in terms of storage and not computational purposes?
01-05-2023 02:53 AM
Yes, the optimization you mentioned is related to storage (so it speeds up loading from storage only before any transformations are made you need to manipulate portions which are create on cluster after transform() is made)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group