<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic parallel execution with applyinpandas on partitioned table in Machine Learning</title>
    <link>https://community.databricks.com/t5/machine-learning/parallel-execution-with-applyinpandas-on-partitioned-table/m-p/62853#M3079</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I have a job that uses &lt;EM&gt;df.groupby(“Country”).applyInPandas(..)&lt;/EM&gt; to run pandas-based hyperparameter tuning in parallel for 6 countries.&lt;/P&gt;&lt;P&gt;It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).&lt;/P&gt;&lt;P&gt;So each worker should take on 1 country and once it’s done, it should immediately start the next country.&lt;/P&gt;&lt;P&gt;The used data is &amp;nbsp;in a delta table on the hive_metastore.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this:&amp;nbsp;&lt;EM&gt;df.write.partitionBy("Country").saveAsTable(..)&lt;/EM&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;In other steps of the project, this brings benefits, but for the tuning it causes &lt;STRONG&gt;issues&lt;/STRONG&gt;:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Only 3 countries start in parallel, so at least &lt;STRONG&gt;1 worker is always idle&lt;/STRONG&gt;&lt;/LI&gt;&lt;LI&gt;When one worker is done, it is idle and &lt;STRONG&gt;waits until ALL workers are done&lt;/STRONG&gt; -&amp;gt; we have to wait for largest country to finish and only then can continue.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;My cluster settings are&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)&lt;/LI&gt;&lt;LI&gt;spark.task.cpus 8 (number of cpus per worker)&lt;/LI&gt;&lt;LI&gt;spark.sql.execution.arrow.enabled true&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).&lt;/P&gt;&lt;P&gt;This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;My questions are:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Why is that happening?&lt;/LI&gt;&lt;LI&gt;Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?&lt;/LI&gt;&lt;LI&gt;Is this a bug and should I rather open a ticket?&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;I hope you can help me.&lt;/P&gt;&lt;P&gt;Best regards,&lt;/P&gt;&lt;P&gt;Julia&lt;/P&gt;</description>
    <pubDate>Thu, 07 Mar 2024 10:31:15 GMT</pubDate>
    <dc:creator>julia</dc:creator>
    <dc:date>2024-03-07T10:31:15Z</dc:date>
    <item>
      <title>parallel execution with applyinpandas on partitioned table</title>
      <link>https://community.databricks.com/t5/machine-learning/parallel-execution-with-applyinpandas-on-partitioned-table/m-p/62853#M3079</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;I have a job that uses &lt;EM&gt;df.groupby(“Country”).applyInPandas(..)&lt;/EM&gt; to run pandas-based hyperparameter tuning in parallel for 6 countries.&lt;/P&gt;&lt;P&gt;It runs on a cluster with 4 workers (Chosen like this because the countries’ datasets are of different sizes – so while the largest country is running, the other workers can handle the remaining countries one after the other).&lt;/P&gt;&lt;P&gt;So each worker should take on 1 country and once it’s done, it should immediately start the next country.&lt;/P&gt;&lt;P&gt;The used data is &amp;nbsp;in a delta table on the hive_metastore.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;So far, this worked as intended. But I’ve read that it is beneficial to partition the table by country (because we do all operations on a country level), so now we create the table like this:&amp;nbsp;&lt;EM&gt;df.write.partitionBy("Country").saveAsTable(..)&lt;/EM&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;In other steps of the project, this brings benefits, but for the tuning it causes &lt;STRONG&gt;issues&lt;/STRONG&gt;:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Only 3 countries start in parallel, so at least &lt;STRONG&gt;1 worker is always idle&lt;/STRONG&gt;&lt;/LI&gt;&lt;LI&gt;When one worker is done, it is idle and &lt;STRONG&gt;waits until ALL workers are done&lt;/STRONG&gt; -&amp;gt; we have to wait for largest country to finish and only then can continue.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;My cluster settings are&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Runtime 12.2 ML (because newer versions can lead to issues with garbage collection so that the cluster freezes)&lt;/LI&gt;&lt;LI&gt;spark.task.cpus 8 (number of cpus per worker)&lt;/LI&gt;&lt;LI&gt;spark.sql.execution.arrow.enabled true&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I tried experimenting with the setting spark.sql.shuffle.partitions, e.g. setting it to 32 (the number of cores).&lt;/P&gt;&lt;P&gt;This partially helped with issue 1, but issue 2 was still there, so it didn’t solve it.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;My questions are:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Why is that happening?&lt;/LI&gt;&lt;LI&gt;Does the partitioning make sense? If yes, can I somehow solve these 2 issues and still have the table partitioned?&lt;/LI&gt;&lt;LI&gt;Is this a bug and should I rather open a ticket?&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;I hope you can help me.&lt;/P&gt;&lt;P&gt;Best regards,&lt;/P&gt;&lt;P&gt;Julia&lt;/P&gt;</description>
      <pubDate>Thu, 07 Mar 2024 10:31:15 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/parallel-execution-with-applyinpandas-on-partitioned-table/m-p/62853#M3079</guid>
      <dc:creator>julia</dc:creator>
      <dc:date>2024-03-07T10:31:15Z</dc:date>
    </item>
    <item>
      <title>Re: parallel execution with applyinpandas on partitioned table</title>
      <link>https://community.databricks.com/t5/machine-learning/parallel-execution-with-applyinpandas-on-partitioned-table/m-p/63073#M3089</link>
      <description>&lt;P&gt;&lt;SPAN&gt;Hi,&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;It only states the reasons why I am confused by the observed behaviour in my job:&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;&lt;EM&gt;"Only 3 countries starting in parallel: This occurs because Spark assigns one worker per partition. If you have more partitions (countries) than workers, some workers remain idle."&lt;/EM&gt;&lt;BR /&gt;--&amp;gt; If Spark assigns one worker per partition and I have more partitions than workers, why would a worker remain idle?&lt;/LI&gt;
&lt;LI&gt;&lt;EM&gt;"Waiting for all workers to finish: Since Spark waits for all tasks (countries) to complete before proceeding, the largest country’s processing time affects the overall job completion."&lt;/EM&gt;&lt;BR /&gt;--&amp;gt; Yes, my intention was that the overall job completion depends on the largest country. However, why are the partitions blocked from executing during the same function call?&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;I would appreciate if someone could actually read through the post and give me feedback if there is anything I can do.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 08 Mar 2024 18:25:20 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/parallel-execution-with-applyinpandas-on-partitioned-table/m-p/63073#M3089</guid>
      <dc:creator>julia</dc:creator>
      <dc:date>2024-03-08T18:25:20Z</dc:date>
    </item>
  </channel>
</rss>

