cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

parallel execution with applyinpandas on partitioned table

julia
New Contributor II

Hi,

I have a job that uses df.groupby(“Country”).applyInPandas(..) to run pandas-based hyperparameter tuning in parallel for 6 countries.

It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).

So each worker should take on 1 country and once it’s done, it should immediately start the next country.

The used data is  in a delta table on the hive_metastore.

 

So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this: df.write.partitionBy("Country").saveAsTable(..)

 

In other steps of the project, this brings benefits, but for the tuning it causes issues:

  1. Only 3 countries start in parallel, so at least 1 worker is always idle
  2. When one worker is done, it is idle and waits until ALL workers are done -> we have to wait for largest country to finish and only then can continue.

My cluster settings are

  • Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)
  • spark.task.cpus 8 (number of cpus per worker)
  • spark.sql.execution.arrow.enabled true

 

I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).

This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.

 

My questions are:

  • Why is that happening?
  • Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?
  • Is this a bug and should I rather open a ticket?

I hope you can help me.

Best regards,

Julia

1 REPLY 1

julia
New Contributor II

Hi,

It only states the reasons why I am confused by the observed behaviour in my job:

  • "Only 3 countries starting in parallel: This occurs because Spark assigns one worker per partition. If you have more partitions (countries) than workers, some workers remain idle."
    --> If Spark assigns one worker per partition and I have more partitions than workers, why would a worker remain idle?
  • "Waiting for all workers to finish: Since Spark waits for all tasks (countries) to complete before proceeding, the largest country’s processing time affects the overall job completion."
    --> Yes, my intention was that the overall job completion depends on the largest country. However, why are the partitions blocked from executing during the same function call?

I would appreciate if someone could actually read through the post and give me feedback if there is anything I can do. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group