cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Bad performance UDFs functions

SaraCorralLou
New Contributor III

Hello,

I am contacting you because I am having a problem with the performance of my notebooks on databricks.

My notebook is written in python (pypark) in it I read a delta table that I copy to a dataframe and do several transformations and create several columns (I have several different dataframes created for it). For the creation of several coumnas I have created a function in my library that takes the initial dataframe as input and returns the same dataframe with the new column.

I can't avoid using the functions but the execution time of my notebook has increased considerably.

I have read that they are executed on the driver node of a Spark application, rather than on the worker nodes where data is distributed and this is the reason why they are slower.

This is the configuration of my cluster:

SaraCorralLou_0-1692357805407.png

The driver node is quite large and I have also added the configuration: spark.databricks.io.cache.enabled true.

I am working with approximately 15 million records.

Is there anything I can do to improve this performance? Any additional settings I'm missing that would help my notebook run faster? Right now it is taking an hour and a half.

Thank you so much in advanced.

7 REPLIES 7

Tharun-Kumar
Honored Contributor II
Honored Contributor II

@SaraCorralLou 

You can use Memory Profiler to profile your UDF. This will help us to understand which part of the UDF is leading to high memory utilization and has multiple calls during the execution.

https://www.databricks.com/blog/2022/11/30/memory-profiling-pyspark.html

SaraCorralLou
New Contributor III

Hi, 

Sorry for the delay of my answer. 

I installed the library and I run my code and I couldn't see any problem in the UDF functions. The biggest one is running in 5 mins. The problem is when we write the final dataframe in the delta table (with the command  

write.format("delta")... ). It's running for more than 30 minutes. So, the schema of our notebook is:
 
df = spark.read.table(source_table_with_15millon_records)
 
df2 = udf_function_to_add_columns(df)
df3 = udf_function_to_add_columns(df2)
df4 = udf_function_to_add_columns(df3)
...
df10.write.format("delta")
 
The final dataframe (df10 in the example) has the same number of rows but with extra columns.
 

Could it be that databricks does some recalculation at the end of the process and that is why it is taking so long?

Thank you very much!

-werners-
Esteemed Contributor III

why do you use a UDF to add columns?  You could write a pyspark function without registering it as a UDF.

A UDF will bypass all optimizations and, when you use Python, performance is bad.

Can you share your UDF code?

SaraCorralLou
New Contributor III

In our original dataframe we have a column in OriginalCurrency and we use the function "convert_to_euros" to convert to euros this amount. The rates are in another delta table:

SaraCorralLou_0-1693226028337.png

This one is the simplest function that we have, as the rest have a lot of business logic.

We could avoid using the functions in some cases (like the one I show you) but since we use this logic in many different notebooks we prefer to have the function and create unit tests on them in order to have a less repetitive code.

But as you can see in our functions we only use basic pyspark code, we don't do collects or displays (which we have read are very bad for performance). At most we do some grouping in some dataframe but nothing really weird.

-werners-
Esteemed Contributor III

ok, so it is not registered as a UDF, which is good.

Looking at your code, nothing special there.  The reason why the performance is so bad is not clear to me.
It could be the table distribution (of source table or rates table), perhaps data skew?
If the rates table is small, it should be broadcasted so the join would go pretty fast.

I'd take a look into the spark UI and query plan what exactly is going on.  Because your code seems fine to me, as long as you don't apply a loop over records (I hope not)?

SaraCorralLou
New Contributor III

We just apply a loop over record in another function. Before we used collect (for row in df.collect()) to loop over them but we changed it to use localIterator (for row in df.rdd.toLocalIterator()) we read that is better for performance. Are we doing it in the correct way?

-werners-
Esteemed Contributor III

looping over records is a performance killer.  To be avoided at all costs.

beware the for-loop (databricks.com)

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.