Having issue getting UDF's to work within a DLT where the UDF is externalized outside of the notebook and it attempts to call other functions. End goal to put unit test coverage around the various functions, hence the pattern.
For test purpose I created a couple of simple UDF functions in a file, udf.py.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# First UDF that reverses a string
def reverse_string(x):
return x[::-1]
reverse_udf = udf(reverse_string, StringType())
# Second UDF that calls the first UDF and then converts the result to uppercase
def upper_and_reverse_string(x):
reversed_string = reverse_string(x) # Call the first UDF function
return reversed_string.upper()
upper_reverse_udf = udf(upper_and_reverse_string, StringType())
When I attempt to utilize the upper_reverse_udf function I keep getting a broken pipe (UDF_PYSPARK_ERROR.UNKNOWN) error.
import dlt
from pyspark.sql.functions import col
from udf import *
def test_transformed_table():
df = dlt.read("test_table").select("system")
return df.withColumn("processed_systems", upper_reverse_udf(col("system")))
The code works fine outside of DLT, just an issue within a DLT.
from pyspark.sql.functions import col
from udf import *
df = spark.read.table("test_table").select("system")
df = df.withColumn("processed_systems", upper_reverse_udf(col("system")))
display (df)
I'm not seeing anything useful in the logs.
Runtime version for Pipeline is "dlt:15.4.9-delta-pipelines-dlt-release-dp-2025.07-rc0-commit-7005556-image-9d7698a"
If I embed all the code into a single function it seems to work, but I lose the ability to reuse functions across different UDF's.