- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-19-2022 01:51 PM
I want to transform a DF with a simple UDF. Afterwards I want to store the resulting DF in a new table (see code below)
key = "test_key"
schema = StructType([
StructField("***", StringType(), True),
StructField("yyy", StringType(), True),
StructField("zzz", StringType(), True),
])
def custom_udf(data):
...
return ("test","test","test)
customUDF = udf(custom_udf,schema)
result_df = big_df.filter(col("key") == testacy).withColumn("result", customUDF(col("data")))
result_df = result_df.select("*", col("result.*")).drop(col("result"))
print(result_df.count())
test_df = result_df.collect()
print(test_df.show(5))
result_df.write.saveAsTable("default.test_table")
I have performance issues and would like to better understand where the performance issues are coming from and how to deal with them.
big_df has ~170 Mio entries.
result_df has ~18 Mio entries.
Line 17: print(result_df.count()) => runs quickly (1 second)
Line 18/19/20: are very very slow (took 90 mins!!)
At first I thought the UDF slows down the transformation. But I assume the transformation was already executed after line 17 when I called count(). I don't have an intuition why the rest is so slow.
How can I speed up writing to a table? How can I better debug the issue to solve it myself next time?
EDIT:
Ingesting csv data with the stream auto loader and storing the data as a delta table happens within seconds. So I don't understand why writing a DataFrame to a table is so slow.
- Labels:
-
Dataframe
-
Performance Issues
-
Result
-
Slow
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-23-2022 02:59 PM
Hi! General speaking, it's good practice to avoid collect() action unless you absolutely need to, because collect() is action operation that will retrieve all the elements of the RDD/DataFrame/Dataset from all nodes to the driver node. If the dataset is large enough, you might even run into out of memory issues.
In your pipeline, you're reading the data, performing ETL, and then write the data out directly to an object storage (in this case, DBFS), you don't need to add this collect() operation here.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-23-2022 02:59 PM
Hi! General speaking, it's good practice to avoid collect() action unless you absolutely need to, because collect() is action operation that will retrieve all the elements of the RDD/DataFrame/Dataset from all nodes to the driver node. If the dataset is large enough, you might even run into out of memory issues.
In your pipeline, you're reading the data, performing ETL, and then write the data out directly to an object storage (in this case, DBFS), you don't need to add this collect() operation here.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-11-2022 11:48 PM
Hello @Jan R
Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!