cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

databricks-connect: PandasUDFs importing local packages: ModuleNotFoundError

AFox
Contributor

databricks-connect==14.1.0

Related to other posts:

Using a local package in a Pandas UDF like:

 

import pandas as pd
import local_module

def udf_wrapper(df):
    schema = "type string, test_name string"
    def udf(pdf: pd.DataFrame) -> pd.DataFrame:
        out = local_module.function(pdf)
        return out
    df.groupby("test_name").applyInPandas(udf, schema=schema)

df = spark.sql('some fun query')
final_df = udf_wrapper(df)

 

results in: ModuleNotFoundError: No module named 'local_module'

Running in a Databricks notebook works just fine.

 

Trying to add the local_module to the spark context via `spark.sparkContext.addPyFile(local_module.zip)` does not work as `sparkContext` is no longer available in spark from databricks-connect. 

 

Is there a way around this? This seems to be a severe limitation that prevents proper unit test and CI flows for UDFs, a solution would be greatly appreciated.

6 REPLIES 6

Kaniz
Community Manager
Community Manager

 Hi @AFox , Let’s explore some potential solutions to address this limitation:

  1. Understanding the Problem:

    • When using databricks-connect, the driver program runs on your local machine, and the worker nodes execute tasks on the cluster.
    • The key to solving this issue lies in understanding whether the driver or worker nodes need access to the module files.
  2. PYTHONPATH Environment Variable:

    • Since you’re submitting your application in client mode, the machine where you run the spark-submit command (i.e., the driver node) needs access to the module files.
    • To resolve this, add your module’s path to the PYTHONPATH environment variable on the node where you’re submitting the job.
    • For example:
      export PYTHONPATH=$PYTHONPATH:/path/to/your/module
      
    • This way, the driver node can find the module without explicitly zipping and shipping it using --py-files.
  3. Worker Nodes and --py-files:

    • If the worker nodes also need access to your module files (for example, if your UDFs execute on worker nodes), you should pass the module as a zip archive using --py-files.
    • Ensure that the --py-files argument precedes your main Python file argument in the spark-submit command:
      ./bin/spark-submit --py-files local_module.zip mycode.py
      
  4. Alternative Approach:

    • If dynamically setting the PYTHONPATH on the worker nodes is not feasible, consider an alternative approach:
      • Package your module as a Python package (with an __init__.py file) and distribute it via a Python package manager (e.g., pip).
      • Install the package on the worker nodes using a bootstrap script or a custom initialization script.
      • This way, the package will be available to both the driver and worker nodes.
  5. Unit Testing and CI Flows:

    • For unit testing and continuous integration (CI) flows, consider creating separate test environments where you can install your package and run tests.
    • Use tools like pytest or unittest to write and execute tests against your UDFs.
    • CI/CD pipelines can automate the testing process, ensuring that your UDFs work correctly.

Remember that the choice between driver and worker nodes depends on whether the module is needed during task execution. Adjust your approach based on your specific requirements.

If you encounter any further issues, feel free to seek additional assistance or provide feedback to the Databricks community for potential enhancements.

Happy coding! 🚀

Hi @Kaniz, Thank you so much for the detailed reply!

TL;DR: How do you use `--py-files` with databricks-connect as you are not explicitly using `spark-submit`? databricks-connect: step-5-add-code

1. Understanding the Problem:

  • Given that the issue revolves around UDF execution it is the worker nodes (cluster) that need access to the module. This is verified by the fact that non UDF (local) functions have no problems accessing the function. In fact we can modify the function to use a loop rather than `applyInPandas` and it runs fine:

 

import pandas as pd
from src import local_module

def udf_wrapper(spark, df):
    schema = "type string, test_name string"
    def udf(pdf: pd.DataFrame) -> pd.DataFrame:
        out = local_module.function(pdf)
        return out

    # This for loop is a slower non distributed way to do what `applyInPandas` is doing
    new_df = pd.DataFrame()
    for test_name in df.select("test_name").distinct().toPandas()["test_name"].tolist():
        pdf = df.filter(df.test_name == test_name).toPandas()
        new_out = make_test_plots(pdf)
        new_df = pd.concat([new_df, new_out])
    return spark.createDataFrame(new_df)  # df.groupby("test_name").applyInPandas(udf, schema=schema)

df = spark.sql('some fun query')
final_df = udf_wrapper(spark, df)​​

 

2. PYTHONPATH Environment Variable:

  • Based on 1 we can say that the PYTHONPATH is set correctly for the Driver Node (local env).
  • Setting on the cluster,  requires the package to be on Databricks. Which is typically fine, but if this module is part of the package that you are testing locally, any local changes will not be reflected. This should not be an issue if we have true unit-tests but dependencies are sometimes hard to break or mock (maybe I just need to be a better programmer)

3. Worker Nodes and --py-files:

  • Here we seem to have a disconnect. When using databricks-connect I am not explicitly using `spark-submit` (albeit maybe under the hood) but rather creating a spark session as shown in the docs: databricks-connect: step-5-add-code
  • Is there a way to use --py-files with databricks-connect when creating a spark session as shown in the docs
  • What I wanted to do is use `spark.sparkContext.addPyFile` in my pytest conftest.py but alas `sparkContext` is no longer available in databricks-connect: 

 

import io
import os
import zipfile
import pytest


def worker_node_context(spark_func):
    # This was just being attempted so not fully formed
    # Sadly does not work do to sparkContext not being available
    def inner(*args, **kwargs):
        spark = spark_func(*args, **kwargs)
        # create zip file of src directory
        path = os.getcwd()
        if "tests" in path:  # handle local vs databricks
            os.ch dir("../")
        src_zip = io.BytesIO()
        with zipfile.ZipFile(src_zip, "w") as myzip:
            for file in os.listdir("src"):
                myzip.write(os.path.join("src", file), file)

        # add local packages to spark context
        spark.sparkContext.addPyFile(src_zip)

        return spark

    return inner


@pytest.fixture
@worker_node_context
def spark():
    if "DATABRICKS_RUNTIME_VERSION" in os.environ:
        # If running in databricks
        from pyspark.sql import SparkSession

        spark = SparkSession.builder.getOrCreate()
    else:
        # If running locally
        from databricks.connect import DatabricksSession
        from databricks.sdk.core import Config

        # get spark session using Databricks SDK's Config class:
        config = Config(
            host=os.environ.get("DATABRICKS_HOST"),
            token=os.environ.get("DATABRICKS_TOKEN"),
            cluster_id=os.environ.get("DATABRICKS_CLUSTER_ID"),
        )
        spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()
    return spark

 

4. Alternative Approach / 5. Unit Testing and CI Flows:

  • Same issues as mentioned in 2 (needed module is part of the package we are testing)
  • Mocking the module with ptyest-mock does not solve the issue as it still yields the ModuleNotFoundError. e.g.: mocker.patch("src.local_module", return_value=out_pd)
  • Integration tests will still be an issue unless we deploy to a QC repo in databricks before we run our CI integration tests which is an odd CICD flow.

Kaniz
Community Manager
Community Manager

Hi @AFox , Certainly! When using Databricks Connect without explicitly invoking spark-submit, you can still manage Python dependencies using the --py-files option. Here’s how:

  1. Databricks Connect and Python Dependencies:

    • Databricks Connect allows you to interact with a Databricks cluster from your local development environment.
    • To manage Python dependencies, you can use the --py-files option to distribute additional Python files (such as libraries or modules) to the cluster.
  2. Steps to Use --py-files with Databricks Connect:

    • Step 1: Prepare Your Python Dependencies:
      • Create a Python package (e.g., a .zip file) containing the necessary dependencies (modules, libraries, etc.).
      • Ensure that your package includes all required files (including any C/C++ dependencies like pyarrow or NumPy).
    • Step 2: Start Databricks Connect:
      • Open a terminal or command prompt.
      • Run the following command to start Databricks Connect:
        databricks-connect configure
        
    • Step 3: Specify Dependencies Using --py-files:
      • In your Python script or notebook, add the following line to specify the Python dependencies:
        spark.conf.set("spark.submit.pyFiles", "/path/to/your/package.zip")
        
      • Replace /path/to/your/package.zip with the actual path to your Python package.
    • Step 4: Run Your Code:
      • Execute your Python code as usual. Databricks Connect will distribute the specified package to the cluster.
  3. Example:

    • Suppose you have a package named my_dependencies.zip containing necessary Python files.
    • In your Python script, set the configuration:
      spark.conf.set("spark.submit.pyFiles", "/path/to/my_dependencies.zip")
      
    • Your code will now run with the specified dependencies available on the cluster.

Remember that this approach works seamlessly with Databricks Connect, allowing you to manage Python dependencies effectively without explicitly using spark-submit. 🐍🚀

AFox
Contributor

@Kaniz Thank you for the response but none of this seems valid for current databricks-connect versions. 

  1. As far as I can tell there is no `databricks-connect` CLI in the current versions so `databrick-connect configure` will not work
  2. `spark.conf.set` yields error: cannot modify the value of the Spark config: "spark.submit.pyFiles". This is a no go since spark 3.0, see: here

If I am missing something, could you provide a working example of your last response?

Kaniz
Community Manager
Community Manager

Hi @AFox , I apologize for any confusion. In Spark 3.0 and above, the SET command does not work on SparkConf entries. This behaviour is by design. If you encounter an error like “Cannot modify the value of a Spark config: spark.submit.pyFiles,” here’s how you can address it:

  1. Remove SET Commands: Remove any SET commands for SparkConf entries from your notebook. These commands won’t work as expected in recent Spark versions.

  2. Cluster-Level Configuration: Instead, set SparkConf values at the cluster level by entering them in the cluster’s Spark config. You can do this in the following ways:

    • AWS Databricks: Configure Spark properties in the cluster settings.
    • Azure Databricks: Adjust Spark configurations in the Azure portal.
    • Google Cloud Databricks: Set Spark properties in the cluster configuration.
  3. Restart the Cluster: After making changes to the cluster-level Spark config, restart the cluster to apply the new settings.

Here’s an example of how you can set a Spark configuration property programmatically using spark.conf.set:

# Example: Set a custom value for spark.driver.host
spark.conf.set("spark.driver.host", "myhost")

Remember to replace "myhost" with your desired value. This approach avoids modifying SparkConf entries directly in the notebook and adheres to the recommended practices for Spark configuration.

For more details, you can refer to the Databricks documentation on this topic.

AFox
Contributor

There is a way to do this!!

 

spark.addArtifact(src_zip_path, pyfile=True)

 

Some things of note:

  • This only works on single user (non shared) clusters
  • src_zip_path must be a posixpath type string (i.e. forward slash ) even on windows (drop C: and replace the backslashes) 
  • The zip file must contain the root folder: src.zip -> src -> module.py
  • There is currently an issue on windows that requires you to patch `os.path` like:

 

import posixpath  # this is a builtin
path2 = os.path
os.path = posixpath  # need to patch on windows
spark.addArtifact(src_zip_path, pyfile=True)
os.path = path2  # put it back

 

They are aware of the issue so this may change in the near future. 

  • I tested in both databricks-connector 13 and 14

Thanks to the databricks support team in helping me with the solution!

 
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.