cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

PySpark pandas_udf slower than single thread

twotwoiscute
New Contributor

I used

@pandas_udf
write a function for speeding up the process(parsing xml file ) and then compare it's speed with single thread , Surprisingly , Using
@pandas_udf
is two times slower than single-thread code. And the number of xml files I need to parse is around 20000. The code below shows exactly what I did :

spark = SparkSession.builder.appName("EDA").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")
@pandas_udf(ArrayType(ArrayType(IntegerType())))
def parse_xml(xml_names : pd.Series) -> pd.Series : 
    results = []
    for xml_name in xml_names:
        tree = ET.parse(xml_name)
        root = tree.getroot()
        keep_boxes = []
        for obj in root.iter("object"):
            class_id = int( obj.find("name").text )
            boxes = obj.find("bndbox")
            xmin = int(boxes.find('xmin').text)
            ymin = int(boxes.find('ymin').text)
            xmax = int(boxes.find('xmax').text)
            ymax = int(boxes.find('ymax').text)
            keep_boxes.append([ class_id , xmin , ymin , xmax , ymax])
        results.append(keep_boxes)
    return pd.Series(results)
#collect all data from different folders 
datas = np.array(get_data()).T.tolist()
schema = StructType([
         StructField('img_name', StringType(), True),
         StructField('xml_name', StringType(), True)])
num_cores = 20 #(number of cores I have)
muls = 3
df = spark.createDataFrame(datas,schema).repartition(muls*num_cores)
pdf_box = df.select(col("img_name"),parse_xml(col('xml_name')).alias("boxes")).toPandas()

As far as I know, since I use for loop so advantage of

pandas_udf
would be gone since it can't really process whole batch at once , However, I still expect that it should be faster than single thread since
Spark
breaks data into parititons and process them parallelly.If the concept that I said above is wrong please correct me.

So I would like to know the reason why it's even slower than single-thread code.Is it because the code I wrote or some important idea that I jsut miss. Thanks!

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @ twotwoiscute ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers on the Forum have an answer to your questions first. Or else I will follow up shortly with a response.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.