Hi,
I have a job that uses df.groupby(“Country”).applyInPandas(..) to run pandas-based hyperparameter tuning in parallel for 6 countries.
It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).
So each worker should take on 1 country and once it’s done, it should immediately start the next country.
The used data is in a delta table on the hive_metastore.
So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this: df.write.partitionBy("Country").saveAsTable(..)
In other steps of the project, this brings benefits, but for the tuning it causes issues:
- Only 3 countries start in parallel, so at least 1 worker is always idle
- When one worker is done, it is idle and waits until ALL workers are done -> we have to wait for largest country to finish and only then can continue.
My cluster settings are
- Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)
- spark.task.cpus 8 (number of cpus per worker)
- spark.sql.execution.arrow.enabled true
I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).
This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.
My questions are:
- Why is that happening?
- Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?
- Is this a bug and should I rather open a ticket?
I hope you can help me.
Best regards,
Julia