I am migrating a massive codebase to Pyspark on Azure Databricks,using DLT Pipelines. It is very important that code will be modular, that is I am looking to make use of UDFs for the timebeing that use modules and classes.
I am receiving the following error:
org.apache.spark.SparkRuntimeException: [UDF_ERROR.PAYLOAD] Execution of function <lambda>(MYCOLUMN_NAME1531)
2) failed - failed to set payload
== Error ==
INVALID_ARGUMENT: No module named 'mymodule'
== Stacktrace ==
With the following code (anonymized to create a minimum working example):
# demo.py
from pyspark.sql.functions import col
import dlt
import mymodule
demodata = mymodule.DemoData("EX")
helper = mymodule.Helper(demodata)
@dlt.table(name="DEMO")
def table():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.Format", "PARQUET")
.load("abfss://...")
.withColumn("DEMO", helper.transform(col("MYCOLUMN_NAME")))
)
# mymodule.py
from pyspark.sql.typos import StringType
from pyspark.sql.functions import udf
class DemoData:
def __init__(self, suffix)
self.suffix = suffix
class Helper:
def __init__(self, demoData):
_suffix = demoData.suffix
self.transform = udf(lambda _string: self.helper(_string, _suffix), StringType())
@staticmethod
def helper(string, suffix):
return string + suffix
#dlt
Can someone help me understand what is happening? I am thinking that the Spark Worker cannot see my module. Is this correct? How would I use UDFs with modular code? I understand that this might not be ideal, but I want to understand this technicality.