parallel execution with applyinpandas on partitioned table

julia
New Contributor II

Hi,

I have a job that uses df.groupby(“Country”).applyInPandas(..) to run pandas-based hyperparameter tuning in parallel for 6 countries.

It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).

So each worker should take on 1 country and once it’s done, it should immediately start the next country.

The used data is  in a delta table on the hive_metastore.

 

So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this: df.write.partitionBy("Country").saveAsTable(..)

 

In other steps of the project, this brings benefits, but for the tuning it causes issues:

  1. Only 3 countries start in parallel, so at least 1 worker is always idle
  2. When one worker is done, it is idle and waits until ALL workers are done -> we have to wait for largest country to finish and only then can continue.

My cluster settings are

  • Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)
  • spark.task.cpus 8 (number of cpus per worker)
  • spark.sql.execution.arrow.enabled true

 

I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).

This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.

 

My questions are:

  • Why is that happening?
  • Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?
  • Is this a bug and should I rather open a ticket?

I hope you can help me.

Best regards,

Julia

julia
New Contributor II

Hi,

It only states the reasons why I am confused by the observed behaviour in my job:

  • "Only 3 countries starting in parallel: This occurs because Spark assigns one worker per partition. If you have more partitions (countries) than workers, some workers remain idle."
    --> If Spark assigns one worker per partition and I have more partitions than workers, why would a worker remain idle?
  • "Waiting for all workers to finish: Since Spark waits for all tasks (countries) to complete before proceeding, the largest country’s processing time affects the overall job completion."
    --> Yes, my intention was that the overall job completion depends on the largest country. However, why are the partitions blocked from executing during the same function call?

I would appreciate if someone could actually read through the post and give me feedback if there is anything I can do.