Hi @THIAM_HUATTAN ,
The error message "Output column features already exists" is caused by the fact that the VectorAssembler
output column features
already exists before the pipeline.fit()
method is called.
Here's what you can do to fix the issue:
- Remove the following line from the code:
df = vector_assembler.transform(df)
- Reassign the output of
vector_assembler.transform()
to new variables such as train_data
and test_data
.
- Pass these new variables to the
fit()
method of the pipeline object instead of the original DataFrame df
.
Here's the modified code:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import numpy as np
# Create a Spark session
spark = SparkSession.builder.appName("MLlibExample").getOrCreate()
# Generate a toy dataset for illustration
np.random.seed(42)
num_samples = 1000
# Features: number of bedrooms, square footage
data = [(np.random.randint(1, 5), 100 + 50 * np.random.rand(), 150 + 75 * np.random.randint(1, 5) + 0.1 * (100 + 50 * np.random.rand()) + 10 * np.random.randn())
for _ in range(num_samples)]
# Create a DataFrame
df = spark.createDataFrame(data, ["bedrooms", "square_footage", "price"])
# Create a feature vector
feature_cols = ["bedrooms", "square_footage"]
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the data
train_data = vector_assembler.transform(df)
# Split the data into training and testing sets
(train_data, test_data) = train_data.randomSplit([0.8, 0.2], seed=42)
# Build a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="price")
# Create a pipeline
pipeline = Pipeline(stages=[vector_assembler, lr])
# Train the model
model = pipeline.fit(train_data)
# Make predictions on the test set
predictions = model.transform(test_data)
# Evaluate the model
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print(f"The error message indicates that an output column named "features" already exists; this is because you have already transformed the DataFrame with the `VectorAssembler()` and added a new column named "features", which is causing the error when it tries to transform the DataFrame again in your pipeline.
To fix this issue, you can rename the output column of the `VectorAssembler()` to a new name and use that in your pipeline instead of the default name "features".
Here's an example:
python from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import numpy as np
Create a Spark session
spark = SparkSession.builder.appName("MLlibExample").getOrCreate()
Generate a toy dataset for illustration
np.random.seed(42) num_samples = 1000
Features: number of bedrooms, square footage
data = [(np.random.randint(1, 5), 100 + 50 * np.random.rand(), 150 + 75 * np.random.randint(1, 5) + 0.1 * (100 + 50 * np.random.rand()) + 10 * np.random.randn()) for _ in range(num_samples)]
Create a DataFrame
df = spark.createDataFrame(data, ["bedrooms", "square_footage", "price"])
Create a feature vector
feature_cols = ["bedrooms", "square_footage"] vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="my_features")
Rename output column "features" to a new name "my_features"
df = vector_assembler.transform(df).withColumnRenamed("my_features", "features")
Split the data into training and testing sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)
Build a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="price")
Create a pipeline
pipeline = Pipeline(stages=[vector_assembler, lr])
Train the model
model = pipeline.fit(train_data)
Make predictions on the test set
predictions = model.transform(test_data)
Evaluate the model
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse") mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error on Test Set: {mse}")
By renaming the output column of the `VectorAssembler()` to a new name "my_features" and using that in your pipeline instead of the default name "features", you can avoid the conflict with the column name "features" when transforming the DataFrame with the pipeline.