parallel execution with applyinpandas on partitioned table

julia
New Contributor II

Hi,

I have a job that uses df.groupby(“Country”).applyInPandas(..) to run pandas-based hyperparameter tuning in parallel for 6 countries.

It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).

So each worker should take on 1 country and once it’s done, it should immediately start the next country.

The used data is  in a delta table on the hive_metastore.

 

So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this: df.write.partitionBy("Country").saveAsTable(..)

 

In other steps of the project, this brings benefits, but for the tuning it causes issues:

  1. Only 3 countries start in parallel, so at least 1 worker is always idle
  2. When one worker is done, it is idle and waits until ALL workers are done -> we have to wait for largest country to finish and only then can continue.

My cluster settings are

  • Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)
  • spark.task.cpus 8 (number of cpus per worker)
  • spark.sql.execution.arrow.enabled true

 

I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).

This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.

 

My questions are:

  • Why is that happening?
  • Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?
  • Is this a bug and should I rather open a ticket?

I hope you can help me.

Best regards,

Julia