parallel execution with applyinpandas on partitioned table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-07-2024 02:31 AM
Hi,
I have a job that uses df.groupby(“Country”).applyInPandas(..) to run pandas-based hyperparameter tuning in parallel for 6 countries.
It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).
So each worker should take on 1 country and once it’s done, it should immediately start the next country.
The used data is in a delta table on the hive_metastore.
So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this: df.write.partitionBy("Country").saveAsTable(..)
In other steps of the project, this brings benefits, but for the tuning it causes issues:
- Only 3 countries start in parallel, so at least 1 worker is always idle
- When one worker is done, it is idle and waits until ALL workers are done -> we have to wait for largest country to finish and only then can continue.
My cluster settings are
- Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)
- spark.task.cpus 8 (number of cpus per worker)
- spark.sql.execution.arrow.enabled true
I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).
This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.
My questions are:
- Why is that happening?
- Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?
- Is this a bug and should I rather open a ticket?
I hope you can help me.
Best regards,
Julia
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-08-2024 07:19 AM - last edited on 03-08-2024 10:25 AM by Retired_mod
Hi,
It only states the reasons why I am confused by the observed behaviour in my job:
- "Only 3 countries starting in parallel: This occurs because Spark assigns one worker per partition. If you have more partitions (countries) than workers, some workers remain idle."
--> If Spark assigns one worker per partition and I have more partitions than workers, why would a worker remain idle? - "Waiting for all workers to finish: Since Spark waits for all tasks (countries) to complete before proceeding, the largest country’s processing time affects the overall job completion."
--> Yes, my intention was that the overall job completion depends on the largest country. However, why are the partitions blocked from executing during the same function call?
I would appreciate if someone could actually read through the post and give me feedback if there is anything I can do.
![](/skins/images/582998B45490C7019731A5B3A872C751/responsive_peak/images/icon_anonymous_message.png)
![](/skins/images/582998B45490C7019731A5B3A872C751/responsive_peak/images/icon_anonymous_message.png)