Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Showing results for 
Search instead for 
Did you mean: 

parallel execution with applyinpandas on partitioned table

New Contributor II


I have a job that uses df.groupby(“Country”).applyInPandas(..) to run pandas-based hyperparameter tuning in parallel for 6 countries.

It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).

So each worker should take on 1 country and once it’s done, it should immediately start the next country.

The used data is  in a delta table on the hive_metastore.


So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this: df.write.partitionBy("Country").saveAsTable(..)


In other steps of the project, this brings benefits, but for the tuning it causes issues:

  1. Only 3 countries start in parallel, so at least 1 worker is always idle
  2. When one worker is done, it is idle and waits until ALL workers are done -> we have to wait for largest country to finish and only then can continue.

My cluster settings are

  • Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)
  • spark.task.cpus 8 (number of cpus per worker)
  • spark.sql.execution.arrow.enabled true


I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).

This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.


My questions are:

  • Why is that happening?
  • Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?
  • Is this a bug and should I rather open a ticket?

I hope you can help me.

Best regards,



Community Manager
Community Manager

Hi @julia, Let’s delve into your questions regarding parallel hyperparameter tuning, table partitioning, and the observed issues:

  1. Why is this happening?

    • The behavior you’re experiencing can be attributed to the way Spark processes data when using partitioned tables.
    • When you partition a table by a specific column (in your case, “Country”), Spark creates separate directories for each partition. These directories contain the data specific to that partition.
    • During parallel processing, Spark assigns tasks to workers based on partitions. Since you’ve partitioned the table by country, each worker handles data for a specific country.
    • However, this partitioning strategy can lead to the observed issues:
      • Only 3 countries starting in parallel: This occurs because Spark assigns one worker per partition. If you have more partitions (countries) than workers, some workers remain idle.
      • Waiting for all workers to finish: Since Spark waits for all tasks (countries) to complete before proceeding, the largest country’s processing time affects the overall job completion.
  2. Does the partitioning make sense?

    • Yes, partitioning by country makes sense if most of your operations are performed at the country level. It allows for efficient data retrieval and filtering.
    • However, the trade-off is that it can lead to uneven workload distribution among workers, as you’ve observed.
    • To address this, consider the following approaches:
  3. Solving the issues while keeping the table partitioned:

    • Dynamic Partitioning: Instead of creating a fixed number of partitions (one per country), consider dynamically adjusting the number of partitions based on the available resources. This way, you can allocate more workers to larger countries.
    • Parallelism Control: Experiment with the spark.sql.shuffle.partitions setting. While setting it to the number of cores (32 in your case) is a good start, you can fine-tune it further. Try different values to find the optimal balance between parallelism and resource utilization.
    • Job Scheduling: Implement a custom job scheduler that assigns tasks to workers dynamically based on their availability. For example, if a worker finishes processing one country, it can immediately start processing the next available country.
    • Task Prioritization: Prioritize tasks based on country size. Start processing the largest country first to minimize overall job completion time.
    • Resource Allocation: Ensure that each worker has sufficient resources (CPU, memory) to handle its assigned task efficiently.
  4. Is this a bug?

    • It’s not a bug but rather a consequence of how Spark handles partitioned data.
    • Consider opening a ticket or discussing this behaviour with the Spark community to explore potential enhancements or best practices for handling such scenarios.

New Contributor II


It only states the reasons why I am confused by the observed behaviour in my job:

  • "Only 3 countries starting in parallel: This occurs because Spark assigns one worker per partition. If you have more partitions (countries) than workers, some workers remain idle."
    --> If Spark assigns one worker per partition and I have more partitions than workers, why would a worker remain idle?
  • "Waiting for all workers to finish: Since Spark waits for all tasks (countries) to complete before proceeding, the largest country’s processing time affects the overall job completion."
    --> Yes, my intention was that the overall job completion depends on the largest country. However, why are the partitions blocked from executing during the same function call?

I would appreciate if someone could actually read through the post and give me feedback if there is anything I can do.