cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

PYSPARK - Can't run DecisionTreeClassifier everytime

Alison7759
New Contributor

Hello community,

It's my first time here and i have a poor english so sorry for the mistakes 😉

I want to make a decision tree in pyspark on a training data (then i would like to evaluate it on a testing data). My target is a variable with 2 values ("one", "two").

So first, i use String Indexer for my target.

Then for each categorical columns, i use string indexer and onehotencoder

Then for the quantitative columns, i just add them with the transform categorical columns in a VectorAssembler.

After, i create my pipeline (fit it on my FULL database and then transform it).

Just after these steps, i do a random split (training : 70%, testing 30%) and i use the DecisionTreeClassifier on my training and i evaluate my model on the testing data.

BUT : I don't know why, sometimes it works, sometimes it doesn't. It's very random and i can't figure it out.

This is my Log Error :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 6389.0 failed 4 times, most recent failure: Lost task 13.3 in stage 6389.0 (TID 85481) (10.0.3.8 executor 12): org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$1837/1490332726: (string) => double)

it's the same thing for the Tuning of my model...

Thank you in advance for your help !!! Have a nice day 🙂

This is my code :

 #MY VARIABLES :
categoricalColumns = ["var1","var2"]
numericCols= ["var3","var4"]
 
#MY TARGET :
stages = [] 
label_stringIdx = StringIndexer(inputCol="target", outputCol="label")
stages += [label_stringIdx]
 
#CATEGORICAL COLUMNS :
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "_Index")
    encoder = OneHotEncoder(inputCol=categoricalCol + "_Index", outputCol=categoricalCol + "_classVec")
    stages += [stringIndexer, encoder]
 
#ADD THE QUANTITATIVE COLUMNS :
assemblerInputs = [c + "_classVec" for c in categoricalColumns] + numericCols
 
#ASSEMBLER :
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
 
#PIPELINE
pipeline = Pipeline().setStages(stages)
pipeline_Fited = pipeline.fit(my_database)
my_database_transf= pipeline_Fited.transform(my_database)
cols = my_database.columns
selectedcols = ["label", "features"] + cols
final_dataset = my_database_transf.select(selectedcols)
 
#RANDOM SPLIT FOR TRAINING AND TESTING DATA
(trainingData, testingData) = final_dataset.randomSplit([0.7, 0.3], seed=100)
 
 
 
#CONSTRUCTION OF THE DECISION TREE ON THE TRAINING 
dt = DecisionTreeClassifier(labelCol="label",featuresCol="features",impurity='gini',maxDepth=4)
dtModel = dt.fit(trainingData) #SOMETIMES I HAVE THE PROBLEM HERE (BUT NOT EVERY TIME)
 
#EVALUATE THE MODEL ON THE TESTING DATA :
predict_test = dtModel.transform(testingData)
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") # Default metricName="areaUnderROC"
#evaluatorAUC.evaluate(dtModel), evaluatorAUC.evaluate(predict_test) #SOMETIMES I HAVE THE PROBLEM HERE (BUT NOT EVERY TIME)
print('Accuracy:', dtevaluator.evaluate(predict_test))
print('AUC:', BinaryClassificationMetrics(predict_test['label','prediction'].rdd).areaUnderROC)
 
 
#THE TUNING 
# Create ParamGrid for Cross Validation
dtparamGrid = (ParamGridBuilder()\
             .addGrid(dt.maxDepth, [2, 3, 5])\
             .addGrid(dt.maxBins, [4,5,6,7,8])\
             .build())
 
# Evaluate model
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
 
# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt, 
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 5)
 
# Run cross validations
cv_dtModel = dtcv.fit(trainingData) #SOMETIMES I HAVE THE PROBLEM HERE (BUT NOT EVERY TIME)
 
# Prediction
predict_train = cv_dtModel.transform(trainingData)
predict_test = cv_dtModel.transform(testingData)
 
# Evaluate model
evaluatorAUC = BinaryClassificationEvaluator() # Default metricName="areaUnderROC"
evaluatorACC= MulticlassClassificationEvaluator(metricName="accuracy")
 
Best_DT_AUC = evaluatorAUC.evaluate(predict_train) #I ALWAYS HAVE THE PROBLEM HERE
Best_DT_ACC = evaluatorACC.evaluate(predict_test) #I ALWAYS HAVE THE PROBLEM HERE
 

A

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.