cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

Running multiple linear regressions in parallel (speeding up for loop)

mbejarano89
New Contributor III

Hi,

I am running several linear regressions on my dataframe, in which I run a regression for every unique value in the column "item" , apply the model to a new dataset (vector_new), and at the end union the results as the loop runs. The problem is that the performance is very poor, is there any other method where I can speed up the process?

Here is a snippet of the code:

emptyRDD = spark.sparkContext.emptyRDD()
lm_results = spark.createDataFrame(emptyRDD,schema)
 
for i in items:
    
        df_item = df.where(col("items")==i)
 
        vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
        vector_df = vectorAssembler.transform(dt)
        premodel_df = vector_df.select(['features', 'Y'])
        lr = LinearRegression(featuresCol = 'features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0)
        lr_model = lr.fit(premodel_df)
        r_squared = lr_model.summary.r2
        item = i
       result=  (lr_model.transform(vector_new).select("x,"y_pred")
                                             .withColumn("item",lit(i))
                                           )
 
        lm_results = lm_results.union(result)

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Marcela Bejarano​ :

One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
 
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
 
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
    F.collect_list('x').alias('x'),
    F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
    row['items'], 
    LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
        vectorAssembler.transform(
            spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
        ).select('features', 'y_pred')
    )
)).collectAsMap()
 
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
    temp_result = lr_model.transform(vector_new.select('x'))
    temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
    if result is None:
        result = temp_result
    else:
        result = result.union(temp_result)
 
# lm_results now contains the results of all linear regressions
lm_results = result

This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.

Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.

View solution in original post

2 REPLIES 2

Anonymous
Not applicable

@Marcela Bejarano​ :

One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
 
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
 
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
    F.collect_list('x').alias('x'),
    F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
    row['items'], 
    LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
        vectorAssembler.transform(
            spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
        ).select('features', 'y_pred')
    )
)).collectAsMap()
 
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
    temp_result = lr_model.transform(vector_new.select('x'))
    temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
    if result is None:
        result = temp_result
    else:
        result = result.union(temp_result)
 
# lm_results now contains the results of all linear regressions
lm_results = result

This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.

Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.

simon_c
New Contributor II

Hi,

Thank you for the idea. I implemented a similar program that tries to run GeneralizedLinearRegression for each group. But I keep getting the error PicklingError: Couldn not serialize object: TypeError: cannot pickle '_thread.RLock' object. 

Any idea what causes this and how to fix it? Really appreciate the help!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group