cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Writing transformed DataFrame to a persistent table is unbearable slow

data_boy_2022
New Contributor III

I want to transform a DF with a simple UDF. Afterwards I want to store the resulting DF in a new table (see code below)

key = "test_key"
 
schema = StructType([
 StructField("***", StringType(), True),
 StructField("yyy", StringType(), True),
 StructField("zzz", StringType(), True),
])
 
def custom_udf(data):
...
  return ("test","test","test)
 
customUDF = udf(custom_udf,schema)
 
result_df = big_df.filter(col("key") == testacy).withColumn("result", customUDF(col("data")))
result_df = result_df.select("*", col("result.*")).drop(col("result"))
print(result_df.count())
test_df = result_df.collect()
print(test_df.show(5))
result_df.write.saveAsTable("default.test_table")

I have performance issues and would like to better understand where the performance issues are coming from and how to deal with them.

big_df has ~170 Mio entries.

result_df has ~18 Mio entries.

Line 17: print(result_df.count()) => runs quickly (1 second)

Line 18/19/20: are very very slow (took 90 mins!!)

At first I thought the UDF slows down the transformation. But I assume the transformation was already executed after line 17 when I called count(). I don't have an intuition why the rest is so slow.

How can I speed up writing to a table? How can I better debug the issue to solve it myself next time?

EDIT:

Ingesting csv data with the stream auto loader and storing the data as a delta table happens within seconds. So I don't understand why writing a DataFrame to a table is so slow.

1 ACCEPTED SOLUTION

Accepted Solutions

Tian
New Contributor III

Hi! General speaking, it's good practice to avoid collect() action unless you absolutely need to, because collect() is action operation that will retrieve all the elements of the RDD/DataFrame/Dataset from all nodes to the driver node. If the dataset is large enough, you might even run into out of memory issues.

In your pipeline, you're reading the data, performing ETL, and then write the data out directly to an object storage (in this case, DBFS), you don't need to add this collect() operation here.

View solution in original post

2 REPLIES 2

Tian
New Contributor III

Hi! General speaking, it's good practice to avoid collect() action unless you absolutely need to, because collect() is action operation that will retrieve all the elements of the RDD/DataFrame/Dataset from all nodes to the driver node. If the dataset is large enough, you might even run into out of memory issues.

In your pipeline, you're reading the data, performing ETL, and then write the data out directly to an object storage (in this case, DBFS), you don't need to add this collect() operation here.

Vidula
Honored Contributor

Hello @Jan R​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.