I want to transform a DF with a simple UDF. Afterwards I want to store the resulting DF in a new table (see code below)
key = "test_key"
schema = StructType([
StructField("***", StringType(), True),
StructField("yyy", StringType(), True),
StructField("zzz", StringType(), True),
])
def custom_udf(data):
...
return ("test","test","test)
customUDF = udf(custom_udf,schema)
result_df = big_df.filter(col("key") == testacy).withColumn("result", customUDF(col("data")))
result_df = result_df.select("*", col("result.*")).drop(col("result"))
print(result_df.count())
test_df = result_df.collect()
print(test_df.show(5))
result_df.write.saveAsTable("default.test_table")
I have performance issues and would like to better understand where the performance issues are coming from and how to deal with them.
big_df has ~170 Mio entries.
result_df has ~18 Mio entries.
Line 17: print(result_df.count()) => runs quickly (1 second)
Line 18/19/20: are very very slow (took 90 mins!!)
At first I thought the UDF slows down the transformation. But I assume the transformation was already executed after line 17 when I called count(). I don't have an intuition why the rest is so slow.
How can I speed up writing to a table? How can I better debug the issue to solve it myself next time?
EDIT:
Ingesting csv data with the stream auto loader and storing the data as a delta table happens within seconds. So I don't understand why writing a DataFrame to a table is so slow.