cancel
Showing results for 
Search instead for 
Did you mean: 

Running multiple linear regressions in parallel (speeding up for loop)

mbejarano89
New Contributor III

Hi,

I am running several linear regressions on my dataframe, in which I run a regression for every unique value in the column "item" , apply the model to a new dataset (vector_new), and at the end union the results as the loop runs. The problem is that the performance is very poor, is there any other method where I can speed up the process?

Here is a snippet of the code:

emptyRDD = spark.sparkContext.emptyRDD()
lm_results = spark.createDataFrame(emptyRDD,schema)
 
for i in items:
    
        df_item = df.where(col("items")==i)
 
        vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
        vector_df = vectorAssembler.transform(dt)
        premodel_df = vector_df.select(['features', 'Y'])
        lr = LinearRegression(featuresCol = 'features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0)
        lr_model = lr.fit(premodel_df)
        r_squared = lr_model.summary.r2
        item = i
       result=  (lr_model.transform(vector_new).select("x,"y_pred")
                                             .withColumn("item",lit(i))
                                           )
 
        lm_results = lm_results.union(result)

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Marcela Bejarano​ :

One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
 
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
 
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
    F.collect_list('x').alias('x'),
    F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
    row['items'], 
    LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
        vectorAssembler.transform(
            spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
        ).select('features', 'y_pred')
    )
)).collectAsMap()
 
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
    temp_result = lr_model.transform(vector_new.select('x'))
    temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
    if result is None:
        result = temp_result
    else:
        result = result.union(temp_result)
 
# lm_results now contains the results of all linear regressions
lm_results = result

This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.

Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.

View solution in original post

1 REPLY 1

Anonymous
Not applicable

@Marcela Bejarano​ :

One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
 
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
 
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
    F.collect_list('x').alias('x'),
    F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
    row['items'], 
    LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
        vectorAssembler.transform(
            spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
        ).select('features', 'y_pred')
    )
)).collectAsMap()
 
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
    temp_result = lr_model.transform(vector_new.select('x'))
    temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
    if result is None:
        result = temp_result
    else:
        result = result.union(temp_result)
 
# lm_results now contains the results of all linear regressions
lm_results = result

This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.

Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.