cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

parallel execution with applyinpandas on partitioned table

julia
New Contributor II

Hi,

I have a job that uses df.groupby(“Country”).applyInPandas(..) to run pandas-based hyperparameter tuning in parallel for 6 countries.

It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).

So each worker should take on 1 country and once it’s done, it should immediately start the next country.

The used data is  in a delta table on the hive_metastore.

 

So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this: df.write.partitionBy("Country").saveAsTable(..)

 

In other steps of the project, this brings benefits, but for the tuning it causes issues:

  1. Only 3 countries start in parallel, so at least 1 worker is always idle
  2. When one worker is done, it is idle and waits until ALL workers are done -> we have to wait for largest country to finish and only then can continue.

My cluster settings are

  • Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)
  • spark.task.cpus 8 (number of cpus per worker)
  • spark.sql.execution.arrow.enabled true

 

I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).

This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.

 

My questions are:

  • Why is that happening?
  • Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?
  • Is this a bug and should I rather open a ticket?

I hope you can help me.

Best regards,

Julia

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @julia, Let’s delve into your questions regarding parallel hyperparameter tuning, table partitioning, and the observed issues:

  1. Why is this happening?

    • The behavior you’re experiencing can be attributed to the way Spark processes data when using partitioned tables.
    • When you partition a table by a specific column (in your case, “Country”), Spark creates separate directories for each partition. These directories contain the data specific to that partition.
    • During parallel processing, Spark assigns tasks to workers based on partitions. Since you’ve partitioned the table by country, each worker handles data for a specific country.
    • However, this partitioning strategy can lead to the observed issues:
      • Only 3 countries starting in parallel: This occurs because Spark assigns one worker per partition. If you have more partitions (countries) than workers, some workers remain idle.
      • Waiting for all workers to finish: Since Spark waits for all tasks (countries) to complete before proceeding, the largest country’s processing time affects the overall job completion.
  2. Does the partitioning make sense?

    • Yes, partitioning by country makes sense if most of your operations are performed at the country level. It allows for efficient data retrieval and filtering.
    • However, the trade-off is that it can lead to uneven workload distribution among workers, as you’ve observed.
    • To address this, consider the following approaches:
  3. Solving the issues while keeping the table partitioned:

    • Dynamic Partitioning: Instead of creating a fixed number of partitions (one per country), consider dynamically adjusting the number of partitions based on the available resources. This way, you can allocate more workers to larger countries.
    • Parallelism Control: Experiment with the spark.sql.shuffle.partitions setting. While setting it to the number of cores (32 in your case) is a good start, you can fine-tune it further. Try different values to find the optimal balance between parallelism and resource utilization.
    • Job Scheduling: Implement a custom job scheduler that assigns tasks to workers dynamically based on their availability. For example, if a worker finishes processing one country, it can immediately start processing the next available country.
    • Task Prioritization: Prioritize tasks based on country size. Start processing the largest country first to minimize overall job completion time.
    • Resource Allocation: Ensure that each worker has sufficient resources (CPU, memory) to handle its assigned task efficiently.
  4. Is this a bug?

    • It’s not a bug but rather a consequence of how Spark handles partitioned data.
    • Consider opening a ticket or discussing this behaviour with the Spark community to explore potential enhancements or best practices for handling such scenarios.

julia
New Contributor II

Hi,

It only states the reasons why I am confused by the observed behaviour in my job:

  • "Only 3 countries starting in parallel: This occurs because Spark assigns one worker per partition. If you have more partitions (countries) than workers, some workers remain idle."
    --> If Spark assigns one worker per partition and I have more partitions than workers, why would a worker remain idle?
  • "Waiting for all workers to finish: Since Spark waits for all tasks (countries) to complete before proceeding, the largest country’s processing time affects the overall job completion."
    --> Yes, my intention was that the overall job completion depends on the largest country. However, why are the partitions blocked from executing during the same function call?

I would appreciate if someone could actually read through the post and give me feedback if there is anything I can do. 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.