- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-04-2023 03:27 AM
Hi,
I am running several linear regressions on my dataframe, in which I run a regression for every unique value in the column "item" , apply the model to a new dataset (vector_new), and at the end union the results as the loop runs. The problem is that the performance is very poor, is there any other method where I can speed up the process?
Here is a snippet of the code:
emptyRDD = spark.sparkContext.emptyRDD()
lm_results = spark.createDataFrame(emptyRDD,schema)
for i in items:
df_item = df.where(col("items")==i)
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
vector_df = vectorAssembler.transform(dt)
premodel_df = vector_df.select(['features', 'Y'])
lr = LinearRegression(featuresCol = 'features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0)
lr_model = lr.fit(premodel_df)
r_squared = lr_model.summary.r2
item = i
result= (lr_model.transform(vector_new).select("x,"y_pred")
.withColumn("item",lit(i))
)
lm_results = lm_results.union(result)
- Labels:
-
Parallel
Accepted Solutions

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-06-2023 06:57 PM
@Marcela Bejarano :
One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
F.collect_list('x').alias('x'),
F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
row['items'],
LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
vectorAssembler.transform(
spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
).select('features', 'y_pred')
)
)).collectAsMap()
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
temp_result = lr_model.transform(vector_new.select('x'))
temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
if result is None:
result = temp_result
else:
result = result.union(temp_result)
# lm_results now contains the results of all linear regressions
lm_results = result
This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.
Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-06-2023 06:57 PM
@Marcela Bejarano :
One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
F.collect_list('x').alias('x'),
F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
row['items'],
LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
vectorAssembler.transform(
spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
).select('features', 'y_pred')
)
)).collectAsMap()
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
temp_result = lr_model.transform(vector_new.select('x'))
temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
if result is None:
result = temp_result
else:
result = result.union(temp_result)
# lm_results now contains the results of all linear regressions
lm_results = result
This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.
Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-29-2024 06:03 AM
Hi,
Thank you for the idea. I implemented a similar program that tries to run GeneralizedLinearRegression for each group. But I keep getting the error PicklingError: Couldn not serialize object: TypeError: cannot pickle '_thread.RLock' object.
Any idea what causes this and how to fix it? Really appreciate the help!

