โ04-04-2023 03:27 AM
Hi,
I am running several linear regressions on my dataframe, in which I run a regression for every unique value in the column "item" , apply the model to a new dataset (vector_new), and at the end union the results as the loop runs. The problem is that the performance is very poor, is there any other method where I can speed up the process?
Here is a snippet of the code:
emptyRDD = spark.sparkContext.emptyRDD()
lm_results = spark.createDataFrame(emptyRDD,schema)
for i in items:
df_item = df.where(col("items")==i)
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
vector_df = vectorAssembler.transform(dt)
premodel_df = vector_df.select(['features', 'Y'])
lr = LinearRegression(featuresCol = 'features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0)
lr_model = lr.fit(premodel_df)
r_squared = lr_model.summary.r2
item = i
result= (lr_model.transform(vector_new).select("x,"y_pred")
.withColumn("item",lit(i))
)
lm_results = lm_results.union(result)
โ04-06-2023 06:57 PM
@Marcela Bejaranoโ :
One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
F.collect_list('x').alias('x'),
F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
row['items'],
LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
vectorAssembler.transform(
spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
).select('features', 'y_pred')
)
)).collectAsMap()
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
temp_result = lr_model.transform(vector_new.select('x'))
temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
if result is None:
result = temp_result
else:
result = result.union(temp_result)
# lm_results now contains the results of all linear regressions
lm_results = result
This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.
Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.
โ04-06-2023 06:57 PM
@Marcela Bejaranoโ :
One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
F.collect_list('x').alias('x'),
F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
row['items'],
LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
vectorAssembler.transform(
spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
).select('features', 'y_pred')
)
)).collectAsMap()
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
temp_result = lr_model.transform(vector_new.select('x'))
temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
if result is None:
result = temp_result
else:
result = result.union(temp_result)
# lm_results now contains the results of all linear regressions
lm_results = result
This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.
Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.