cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Parallelizing processing of multiple spark dataframes

Dhruv_Sinha
New Contributor II

Hi all, I am trying to create a collection rd that contains a list of spark dataframes. I want to parallelize the cleaning process for each of these dataframes. Later on, I am sending each of these dataframes to another method. However, when I parallelize, I get an error that spark context cannot be accessed from worker nodes. I understand the error, but I wanted to learn if there is a way around it. 

 

 

def import_data(code):
    
    # assume that full_path is available and model_df is imported successfully 
    model_df = (spark
            .read
            .parquet(full_path)
    )
    return model_df

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
list_code = [59, 48]
input_list = []
for code in list_code:
    input_dict = {}
    model_df = import_data(code)
    input_dict[code] = model_df
    input_list.append(input_dict)
sc = spark.sparkContext
collection_rdd = sc.parallelize(input_list)

 

 

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @Dhruv_SinhaThe issue you’re encountering—where the Spark context (sc) cannot be accessed from worker nodes—is a common challenge when working with Spark.

Let’s explore why this happens and discuss potential workarounds.

  1. Spark Context and Worker Nodes:

    • The Spark context (sc) is created on the driver node, which is the machine where your Spark application starts.
    • When you parallelize data using sc.parallelize(), Spark distributes the data across worker nodes (executors) in the cluster.
    • However, the Spark context (sc) is not automatically available on worker nodes because they run in separate JVMs (Java Virtual Machines).
  2. Why You Encounter the Error:

    • When you try to access sc within a transformation (such as map or foreach), it fails because the worker nodes don’t have direct access to the driver’s Spark context.
    • This limitation is by design to prevent issues related to distributed computing and data consistency.
  3. Workarounds:

    • While direct access to sc from worker nodes is not possible, you can use alternative approaches:
      • Broadcast Variables: If you need to share read-only data (like configuration settings) with worker nodes, consider using broadcast variables. These are efficiently distributed to all nodes.
      • Accumulators: For aggregations or counters, use accumulators. They allow worker nodes to update shared variables in a distributed manner.
      • Driver-side Processing: Perform any driver-side processing (such as collecting results) before distributing data to worker nodes.
      • Map-Only Operations: If possible, structure your operations as map-only transformations (without relying on the Spark context).
  4. Example Using Broadcast Variables:

    • Suppose you want to pass some configuration settings to worker nodes:
      # Create a broadcast variable
      config_settings = {"key1": "value1", "key2": "value2"}
      broadcast_settings = sc.broadcast(config_settings)
      
      def process_dataframe(df):
          # Access the broadcast variable
          settings = broadcast_settings.value
          # Your processing logic here
          # ...
      
      # Apply process_dataframe to each dataframe in your collection_rdd
      result_rdd = collection_rdd.map(process_dataframe)
      
  5. Cluster Mode and Monitoring:

Remember that Spark is designed for distributed processing, and understanding its execution model is crucial for efficient and correct development. Feel free to adapt the suggested approaches based on your specific use case! 🚀

Dhruv_Sinha
New Contributor II

Dear @Kaniz, thank you very much for your prompt response. This is a very detailed answer and I really appreciate all your help. Let me describe my problem more specifically. I have several datasets stored in parquet format. They are named 'xx_df', 'yy_df' ,etc. Now I want to read these datasets as Spark dataframes and I want to perform some cleaning on them. For example, I want to remove all the columns in each dataset which has more than 90% null values. Following that, I want to train a separate machine-learning model on each dataset. 

So, I want to understand how can I parallelize the reading and processing of parquet datasets into spark data frames. I can share pseudo code with you if that would be helpful.  

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.