cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

databricks-connect: PandasUDFs importing local packages: ModuleNotFoundError

AFox
Contributor

databricks-connect==14.1.0

Related to other posts:

Using a local package in a Pandas UDF like:

 

import pandas as pd
import local_module

def udf_wrapper(df):
    schema = "type string, test_name string"
    def udf(pdf: pd.DataFrame) -> pd.DataFrame:
        out = local_module.function(pdf)
        return out
    df.groupby("test_name").applyInPandas(udf, schema=schema)

df = spark.sql('some fun query')
final_df = udf_wrapper(df)

 

results in: ModuleNotFoundError: No module named 'local_module'

Running in a Databricks notebook works just fine.

 

Trying to add the local_module to the spark context via `spark.sparkContext.addPyFile(local_module.zip)` does not work as `sparkContext` is no longer available in spark from databricks-connect. 

 

Is there a way around this? This seems to be a severe limitation that prevents proper unit test and CI flows for UDFs, a solution would be greatly appreciated.

3 REPLIES 3

Hi @Retired_mod, Thank you so much for the detailed reply!

TL;DR: How do you use `--py-files` with databricks-connect as you are not explicitly using `spark-submit`? databricks-connect: step-5-add-code

1. Understanding the Problem:

  • Given that the issue revolves around UDF execution it is the worker nodes (cluster) that need access to the module. This is verified by the fact that non UDF (local) functions have no problems accessing the function. In fact we can modify the function to use a loop rather than `applyInPandas` and it runs fine:

 

import pandas as pd
from src import local_module

def udf_wrapper(spark, df):
    schema = "type string, test_name string"
    def udf(pdf: pd.DataFrame) -> pd.DataFrame:
        out = local_module.function(pdf)
        return out

    # This for loop is a slower non distributed way to do what `applyInPandas` is doing
    new_df = pd.DataFrame()
    for test_name in df.select("test_name").distinct().toPandas()["test_name"].tolist():
        pdf = df.filter(df.test_name == test_name).toPandas()
        new_out = make_test_plots(pdf)
        new_df = pd.concat([new_df, new_out])
    return spark.createDataFrame(new_df)  # df.groupby("test_name").applyInPandas(udf, schema=schema)

df = spark.sql('some fun query')
final_df = udf_wrapper(spark, df)​​

 

2. PYTHONPATH Environment Variable:

  • Based on 1 we can say that the PYTHONPATH is set correctly for the Driver Node (local env).
  • Setting on the cluster,  requires the package to be on Databricks. Which is typically fine, but if this module is part of the package that you are testing locally, any local changes will not be reflected. This should not be an issue if we have true unit-tests but dependencies are sometimes hard to break or mock (maybe I just need to be a better programmer)

3. Worker Nodes and --py-files:

  • Here we seem to have a disconnect. When using databricks-connect I am not explicitly using `spark-submit` (albeit maybe under the hood) but rather creating a spark session as shown in the docs: databricks-connect: step-5-add-code
  • Is there a way to use --py-files with databricks-connect when creating a spark session as shown in the docs
  • What I wanted to do is use `spark.sparkContext.addPyFile` in my pytest conftest.py but alas `sparkContext` is no longer available in databricks-connect: 

 

import io
import os
import zipfile
import pytest


def worker_node_context(spark_func):
    # This was just being attempted so not fully formed
    # Sadly does not work do to sparkContext not being available
    def inner(*args, **kwargs):
        spark = spark_func(*args, **kwargs)
        # create zip file of src directory
        path = os.getcwd()
        if "tests" in path:  # handle local vs databricks
            os.ch dir("../")
        src_zip = io.BytesIO()
        with zipfile.ZipFile(src_zip, "w") as myzip:
            for file in os.listdir("src"):
                myzip.write(os.path.join("src", file), file)

        # add local packages to spark context
        spark.sparkContext.addPyFile(src_zip)

        return spark

    return inner


@pytest.fixture
@worker_node_context
def spark():
    if "DATABRICKS_RUNTIME_VERSION" in os.environ:
        # If running in databricks
        from pyspark.sql import SparkSession

        spark = SparkSession.builder.getOrCreate()
    else:
        # If running locally
        from databricks.connect import DatabricksSession
        from databricks.sdk.core import Config

        # get spark session using Databricks SDK's Config class:
        config = Config(
            host=os.environ.get("DATABRICKS_HOST"),
            token=os.environ.get("DATABRICKS_TOKEN"),
            cluster_id=os.environ.get("DATABRICKS_CLUSTER_ID"),
        )
        spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()
    return spark

 

4. Alternative Approach / 5. Unit Testing and CI Flows:

  • Same issues as mentioned in 2 (needed module is part of the package we are testing)
  • Mocking the module with ptyest-mock does not solve the issue as it still yields the ModuleNotFoundError. e.g.: mocker.patch("src.local_module", return_value=out_pd)
  • Integration tests will still be an issue unless we deploy to a QC repo in databricks before we run our CI integration tests which is an odd CICD flow.

AFox
Contributor

@Retired_mod Thank you for the response but none of this seems valid for current databricks-connect versions. 

  1. As far as I can tell there is no `databricks-connect` CLI in the current versions so `databrick-connect configure` will not work
  2. `spark.conf.set` yields error: cannot modify the value of the Spark config: "spark.submit.pyFiles". This is a no go since spark 3.0, see: here

If I am missing something, could you provide a working example of your last response?

AFox
Contributor

There is a way to do this!!

 

spark.addArtifact(src_zip_path, pyfile=True)

 

Some things of note:

  • This only works on single user (non shared) clusters
  • src_zip_path must be a posixpath type string (i.e. forward slash ) even on windows (drop C: and replace the backslashes) 
  • The zip file must contain the root folder: src.zip -> src -> module.py
  • There is currently an issue on windows that requires you to patch `os.path` like:

 

import posixpath  # this is a builtin
path2 = os.path
os.path = posixpath  # need to patch on windows
spark.addArtifact(src_zip_path, pyfile=True)
os.path = path2  # put it back

 

They are aware of the issue so this may change in the near future. 

  • I tested in both databricks-connector 13 and 14

Thanks to the databricks support team in helping me with the solution!

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group