Tuning `CrossValidator` spark job performance

mradassaad
New Contributor III

I am running a 3-fold cross validation of an ML pipeline that utilizes `GBTClassifier` as the final step. It takes 18 hours to run and I am looking for feedback into how to improve the performance as I expect this to go faster.

For context here is the cluster configuration:

{
    "autoscale": {
        "min_workers": 1,
        "max_workers": 4
    },
    "cluster_name": "model_training",
    "spark_version": "10.3.x-cpu-ml-scala2.12",
    "spark_conf": {
        "spark.conf.set(\"spark.databricks.io.cache.enabled\",": "\"true\")",
        "spark.databricks.delta.preview.enabled": "true"
    },
    "azure_attributes": {
        "first_on_demand": 1,
        "availability": "ON_DEMAND_AZURE",
        "spot_bid_max_price": -1
    },
    "node_type_id": "Standard_E8_v3",
    "driver_node_type_id": "Standard_E8_v3",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": {},
    "autotermination_minutes": 60,
    "enable_elastic_disk": true,
    "cluster_source": "UI",
    "init_scripts": [],
    "cluster_id": "0224-222219-94q7zutd"
}

The below defines the pipeline:

# Define feature assembled
assembler = VectorAssembler(inputCols=feature_layers, outputCol="assembled", handleInvalid='keep')
 
# Define polynomial expansion on features
px = PolynomialExpansion(degree=3, inputCol="assembled", outputCol="expanded")
 
# Define standard scaler
standardScaler = StandardScaler(withMean=True, withStd=True, inputCol="expanded", outputCol="features")
 
# Define gradient boosted tree classifier
gbt = GBTClassifier(stepSize=0.3, maxIter=50)
# Define pipeline
pipeline = Pipeline(stages=[assembler, px, standardScaler, gbt])

The parameter grid contains 3 parameter values for `maxDepth`. Cross Validator uses 3 folds and a parallelism of 3.

paramGrid = (ParamGridBuilder().
    addGrid(gbt.maxDepth, [7, 10, 15]).
   build())
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'),
                          numFolds=3, collectSubModels=False, parallelism=3) 
 
gbt_pipe = crossval.fit(train)

Train is obtained from a dataset that has been repartitioned into 500 partitions and then 'cached':

df = df.na.drop().repartition(500)
df.cache()
 
df_label = df \
      .withColumn("label", F.when(df["threshold_runoff"] < runoff_amount, 1).otherwise(0)) .drop("threshold_runoff")
 
train, test = df_label.randomSplit([0.95, 0.05])

Here is a screenshot of one of the jobs:

Random Forest Job 

Job Summary:

Random Forest Job Summary 

Top half of storage tab:

GBT storage top half 

How would you go about diagnosing this issue? Any tips for performance improvements here?

I have tried increasing and decreasing the number of partitions but for a different algorithm (Logistic Regression) and 500 partitions seemed to worked quite well on the same dataset. I appreciate your input!