@Marcela Bejarano :
One approach to speed up the process is to avoid using a loop and instead use Spark's groupBy and map functions. Here is an example:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
vectorAssembler = VectorAssembler(inputCols = ['x'], outputCol = 'features')
# Group by 'items' column and apply the linear regression model
# to each group
lr_models = df.groupBy('items').agg(
F.collect_list('x').alias('x'),
F.collect_list('y_pred').alias('y')
).rdd.map(lambda row: (
row['items'],
LinearRegression(featuresCol='features', labelCol='y_pred', maxIter=10, regParam=0, elasticNetParam=0).fit(
vectorAssembler.transform(
spark.createDataFrame(zip(row['x'], row['y']), schema=['x', 'y_pred'])
).select('features', 'y_pred')
)
)).collectAsMap()
# Apply the models to the new dataset
result = None
for item, lr_model in lr_models.items():
temp_result = lr_model.transform(vector_new.select('x'))
temp_result = temp_result.withColumnRenamed('prediction', 'y_pred').withColumn('item', lit(item))
if result is None:
result = temp_result
else:
result = result.union(temp_result)
# lm_results now contains the results of all linear regressions
lm_results = result
This approach groups the data by the "items" column using the groupBy function, then applies the linear regression model to each group using the map function. The results are collected into a dictionary, where the key is the item and the value is the trained model. Then, the models are applied to the new dataset and the results are combined using the union function.
Note that this approach uses the collectAsMap function, which collects the results into a dictionary that can fit in memory. If the number of unique items is very large, you may need to use a different approach to collect the results.