cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Linear Regression HELP! Pickle + Broadcast Variable Error

Kash
Contributor III

Hi there,

I need some help with this example. We're trying to create a linearRegression model that can parallelize for thousands of symbols per date. When we run this we get a picklingError

Any suggestions would be much appreciated!

K

Error:

PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Code:

from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
 
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
 
# Create an RDD with your data
data_rdd = spark.sparkContext.parallelize([
    ("symbol1", 1, 2, 3),
    ("symbol2", 4, 5, 6),
    ("symbol3", 7, 8, 9)
])
 
# Convert the RDD to a DataFrame
data_df = data_rdd.toDF(["Symbol", "Feature1", "Feature2", "Feature3"])
 
# Define the features column
assembler = VectorAssembler(inputCols=["Feature1", "Feature2", "Feature3"], outputCol="features")
 
# Fit models on each partition and collect the weights
def fit_model(partition):
    # Create a new linear regression model
    model = LinearRegression(featuresCol="features", labelCol="Symbol")
 
    # Create an empty list to store the weights
    weights = []
 
    # Convert the partition iterator to a list
    data_list = list(partition)
 
    # Convert the list to a DataFrame
    data_partition_df = spark.createDataFrame(data_list, data_df.columns)
 
    # Perform vector assembly
    data_partition_df = assembler.transform(data_partition_df)
 
    # Fit the model on the partition data
    fitted_model = model.fit(data_partition_df)
 
    # Get the model weights
    weights = [fitted_model.coefficients[i] for i in range(len(fitted_model.coefficients))]
 
    # Yield the weights
    yield weights
 
# Fit models on each partition and collect the weights
partition_weights = data_df.rdd.mapPartitions(fit_model).collect()
 
# Create a DataFrame with the collected weights
weights_df = spark.createDataFrame(partition_weights, ["Weight1", "Weight2", "Weight3"])
 
# Show the weights DataFrame
weights_df.show()

3 REPLIES 3

Anonymous
Not applicable

Hi @Avkash Kana​ 

Great to meet you, and thanks for your question!

Let's see if your peers in the community have an answer to your question. Thanks.

Kash
Contributor III

Thanks. We're eagerly waiting to see what the community thinks. We're also open to using DB built in ML technology but we're unclear how to use it for our use case.

Kash
Contributor III

@Vidula Khanna​ Can you assist?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group