<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic PySpark pandas_udf slower than single thread in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/pyspark-pandas-udf-slower-than-single-thread/m-p/17699#M11662</link>
    <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I used &lt;PRE&gt;&lt;CODE&gt;@pandas_udf&lt;/CODE&gt;&lt;/PRE&gt; write a function for speeding up the process(parsing xml file ) and then compare it's speed with single thread , Surprisingly , Using &lt;PRE&gt;&lt;CODE&gt;@pandas_udf&lt;/CODE&gt;&lt;/PRE&gt; is two times slower than single-thread code. And the number of xml files I need to parse is around 20000. The code below shows exactly what I did :&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;spark = SparkSession.builder.appName("EDA").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;PRE&gt;&lt;CODE&gt;@pandas_udf(ArrayType(ArrayType(IntegerType())))
def parse_xml(xml_names : pd.Series) -&amp;gt; pd.Series : 
    results = []
    for xml_name in xml_names:
        tree = ET.parse(xml_name)
        root = tree.getroot()
        keep_boxes = []
        for obj in root.iter("object"):
            class_id = int( obj.find("name").text )
            boxes = obj.find("bndbox")
            xmin = int(boxes.find('xmin').text)
            ymin = int(boxes.find('ymin').text)
            xmax = int(boxes.find('xmax').text)
            ymax = int(boxes.find('ymax').text)
            keep_boxes.append([ class_id , xmin , ymin , xmax , ymax])
        results.append(keep_boxes)
    return pd.Series(results)
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;PRE&gt;&lt;CODE&gt;#collect all data from different folders 
datas = np.array(get_data()).T.tolist()
schema = StructType([
         StructField('img_name', StringType(), True),
         StructField('xml_name', StringType(), True)])
num_cores = 20 #(number of cores I have)
muls = 3
df = spark.createDataFrame(datas,schema).repartition(muls*num_cores)
pdf_box = df.select(col("img_name"),parse_xml(col('xml_name')).alias("boxes")).toPandas()
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;As far as I know, since I use for loop so advantage of &lt;PRE&gt;&lt;CODE&gt;pandas_udf&lt;/CODE&gt;&lt;/PRE&gt; would be gone since it can't really process whole batch at once , However, I still expect that it should be faster than single thread since &lt;PRE&gt;&lt;CODE&gt;Spark&lt;/CODE&gt;&lt;/PRE&gt; breaks data into parititons and process them parallelly.If the concept that I said above is wrong please correct me.&lt;/P&gt;
&lt;P&gt;So I would like to know the reason why it's even slower than single-thread code.Is it because the code I wrote or some important idea that I jsut miss. Thanks!&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 15 Jul 2021 03:00:55 GMT</pubDate>
    <dc:creator>twotwoiscute</dc:creator>
    <dc:date>2021-07-15T03:00:55Z</dc:date>
    <item>
      <title>PySpark pandas_udf slower than single thread</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-pandas-udf-slower-than-single-thread/m-p/17699#M11662</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I used &lt;PRE&gt;&lt;CODE&gt;@pandas_udf&lt;/CODE&gt;&lt;/PRE&gt; write a function for speeding up the process(parsing xml file ) and then compare it's speed with single thread , Surprisingly , Using &lt;PRE&gt;&lt;CODE&gt;@pandas_udf&lt;/CODE&gt;&lt;/PRE&gt; is two times slower than single-thread code. And the number of xml files I need to parse is around 20000. The code below shows exactly what I did :&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;spark = SparkSession.builder.appName("EDA").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;PRE&gt;&lt;CODE&gt;@pandas_udf(ArrayType(ArrayType(IntegerType())))
def parse_xml(xml_names : pd.Series) -&amp;gt; pd.Series : 
    results = []
    for xml_name in xml_names:
        tree = ET.parse(xml_name)
        root = tree.getroot()
        keep_boxes = []
        for obj in root.iter("object"):
            class_id = int( obj.find("name").text )
            boxes = obj.find("bndbox")
            xmin = int(boxes.find('xmin').text)
            ymin = int(boxes.find('ymin').text)
            xmax = int(boxes.find('xmax').text)
            ymax = int(boxes.find('ymax').text)
            keep_boxes.append([ class_id , xmin , ymin , xmax , ymax])
        results.append(keep_boxes)
    return pd.Series(results)
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;PRE&gt;&lt;CODE&gt;#collect all data from different folders 
datas = np.array(get_data()).T.tolist()
schema = StructType([
         StructField('img_name', StringType(), True),
         StructField('xml_name', StringType(), True)])
num_cores = 20 #(number of cores I have)
muls = 3
df = spark.createDataFrame(datas,schema).repartition(muls*num_cores)
pdf_box = df.select(col("img_name"),parse_xml(col('xml_name')).alias("boxes")).toPandas()
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;As far as I know, since I use for loop so advantage of &lt;PRE&gt;&lt;CODE&gt;pandas_udf&lt;/CODE&gt;&lt;/PRE&gt; would be gone since it can't really process whole batch at once , However, I still expect that it should be faster than single thread since &lt;PRE&gt;&lt;CODE&gt;Spark&lt;/CODE&gt;&lt;/PRE&gt; breaks data into parititons and process them parallelly.If the concept that I said above is wrong please correct me.&lt;/P&gt;
&lt;P&gt;So I would like to know the reason why it's even slower than single-thread code.Is it because the code I wrote or some important idea that I jsut miss. Thanks!&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 15 Jul 2021 03:00:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-pandas-udf-slower-than-single-thread/m-p/17699#M11662</guid>
      <dc:creator>twotwoiscute</dc:creator>
      <dc:date>2021-07-15T03:00:55Z</dc:date>
    </item>
  </channel>
</rss>

