cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Linear Regression HELP! Pickle + Broadcast Variable Error

Kash
Contributor III

Hi there,

I need some help with this example. We're trying to create a linearRegression model that can parallelize for thousands of symbols per date. When we run this we get a picklingError

Any suggestions would be much appreciated!

K

Error:

PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Code:

from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
 
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
 
# Create an RDD with your data
data_rdd = spark.sparkContext.parallelize([
    ("symbol1", 1, 2, 3),
    ("symbol2", 4, 5, 6),
    ("symbol3", 7, 8, 9)
])
 
# Convert the RDD to a DataFrame
data_df = data_rdd.toDF(["Symbol", "Feature1", "Feature2", "Feature3"])
 
# Define the features column
assembler = VectorAssembler(inputCols=["Feature1", "Feature2", "Feature3"], outputCol="features")
 
# Fit models on each partition and collect the weights
def fit_model(partition):
    # Create a new linear regression model
    model = LinearRegression(featuresCol="features", labelCol="Symbol")
 
    # Create an empty list to store the weights
    weights = []
 
    # Convert the partition iterator to a list
    data_list = list(partition)
 
    # Convert the list to a DataFrame
    data_partition_df = spark.createDataFrame(data_list, data_df.columns)
 
    # Perform vector assembly
    data_partition_df = assembler.transform(data_partition_df)
 
    # Fit the model on the partition data
    fitted_model = model.fit(data_partition_df)
 
    # Get the model weights
    weights = [fitted_model.coefficients[i] for i in range(len(fitted_model.coefficients))]
 
    # Yield the weights
    yield weights
 
# Fit models on each partition and collect the weights
partition_weights = data_df.rdd.mapPartitions(fit_model).collect()
 
# Create a DataFrame with the collected weights
weights_df = spark.createDataFrame(partition_weights, ["Weight1", "Weight2", "Weight3"])
 
# Show the weights DataFrame
weights_df.show()

3 REPLIES 3

Anonymous
Not applicable

Hi @Avkash Kana​ 

Great to meet you, and thanks for your question!

Let's see if your peers in the community have an answer to your question. Thanks.

Kash
Contributor III

Thanks. We're eagerly waiting to see what the community thinks. We're also open to using DB built in ML technology but we're unclear how to use it for our use case.

Kash
Contributor III

@Vidula Khanna​ Can you assist?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.