cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Using pyspark databricks UDFs with outside function imports

tp992
New Contributor II

Problem with minimal example

The below minimal example does not run locally with databricks-connect==15.3 but does run within databricks workspace.

main.py

from databricks.connect import DatabricksSession

from module.udf import send_message, send_complex_message

spark = DatabricksSession.builder.getOrCreate()


def main_works():
    df = spark.createDataFrame([
        (1, "James"),
        (2, "John"),
    ], ["id", "name"])

    df = df.withColumn("message", send_message("name"))

    df.show()


def main_fails():
    df = spark.createDataFrame([
        (1, "James"),
        (2, "John"),
    ], ["id", "name"])

    df = df.withColumn("message", send_complex_message("name"))

    df.show()

if __name__ == "__main__":
    main_works()
    main_fails()

udf.py

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

GREETING = "Hi"
COWBOY_GREETING = "Howdy"

def _get_greeting() -> str:
    return COWBOY_GREETING

@udf(returnType=StringType())
def send_message(message: str) -> str:
    return f"{GREETING}, {message}!"

@udf(returnType=StringType())
def send_complex_message(message: str) -> str:
    greeting_str = _get_greeting()
    return f"{greeting_str}, {message}!"

Traceback when running locally

The second function fails:

Traceback (most recent call last):
  File "main.py", line 31, in <module>
    main_fails()
  File "main.py", line 27, in main_fails
    df.show()
  [shortened]
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'module'

Expected result and question

See below. How do you properly debug your Databricks workflows if they cannot allow for some UDF function nesting? It's very hard to debug them. Now we develop code for local-use == True which works around UDFs.

+---+-----+----------+
| id| name|   message|
+---+-----+----------+
|  1|James|Hi, James!|
|  2| John| Hi, John!|
+---+-----+----------+

and

+---+-----+-------------+
| id| name|      message|
+---+-----+-------------+
|  1|James|Howdy, James!|
|  2| John| Howdy, John!|
+---+-----+-------------+
 
1 REPLY 1

tp992
New Contributor II

I think the solution is in .addArtifact if I read this:

 

But have not gotten it to work just yet. 

 

```

spark = DatabricksSession.builder.getOrCreate()
 
venv_pack.pack(output='pyspark_venv.tar.gz')
spark.addArtifact(
"pyspark_venv.tar.gz#environment",
archive=True)
spark.conf.set(
"spark.sql.execution.pyspark.python", "environment/bin/python")
spark.addArtifact("minimal_example_1/module/udf.py", pyfile=True)
```

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now