Hi @Retired_mod, Thank you so much for the detailed reply!
TL;DR: How do you use `--py-files` with databricks-connect as you are not explicitly using `spark-submit`? databricks-connect: step-5-add-code
1. Understanding the Problem:
- Given that the issue revolves around UDF execution it is the worker nodes (cluster) that need access to the module. This is verified by the fact that non UDF (local) functions have no problems accessing the function. In fact we can modify the function to use a loop rather than `applyInPandas` and it runs fine:
import pandas as pd
from src import local_module
def udf_wrapper(spark, df):
schema = "type string, test_name string"
def udf(pdf: pd.DataFrame) -> pd.DataFrame:
out = local_module.function(pdf)
return out
# This for loop is a slower non distributed way to do what `applyInPandas` is doing
new_df = pd.DataFrame()
for test_name in df.select("test_name").distinct().toPandas()["test_name"].tolist():
pdf = df.filter(df.test_name == test_name).toPandas()
new_out = make_test_plots(pdf)
new_df = pd.concat([new_df, new_out])
return spark.createDataFrame(new_df) # df.groupby("test_name").applyInPandas(udf, schema=schema)
df = spark.sql('some fun query')
final_df = udf_wrapper(spark, df)
2. PYTHONPATH Environment Variable:
- Based on 1 we can say that the PYTHONPATH is set correctly for the Driver Node (local env).
- Setting on the cluster, requires the package to be on Databricks. Which is typically fine, but if this module is part of the package that you are testing locally, any local changes will not be reflected. This should not be an issue if we have true unit-tests but dependencies are sometimes hard to break or mock (maybe I just need to be a better programmer)
3. Worker Nodes and --py-files:
- Here we seem to have a disconnect. When using databricks-connect I am not explicitly using `spark-submit` (albeit maybe under the hood) but rather creating a spark session as shown in the docs: databricks-connect: step-5-add-code
- Is there a way to use --py-files with databricks-connect when creating a spark session as shown in the docs
- What I wanted to do is use `spark.sparkContext.addPyFile` in my pytest conftest.py but alas `sparkContext` is no longer available in databricks-connect:
import io
import os
import zipfile
import pytest
def worker_node_context(spark_func):
# This was just being attempted so not fully formed
# Sadly does not work do to sparkContext not being available
def inner(*args, **kwargs):
spark = spark_func(*args, **kwargs)
# create zip file of src directory
path = os.getcwd()
if "tests" in path: # handle local vs databricks
os.ch dir("../")
src_zip = io.BytesIO()
with zipfile.ZipFile(src_zip, "w") as myzip:
for file in os.listdir("src"):
myzip.write(os.path.join("src", file), file)
# add local packages to spark context
spark.sparkContext.addPyFile(src_zip)
return spark
return inner
@pytest.fixture
@worker_node_context
def spark():
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
# If running in databricks
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
else:
# If running locally
from databricks.connect import DatabricksSession
from databricks.sdk.core import Config
# get spark session using Databricks SDK's Config class:
config = Config(
host=os.environ.get("DATABRICKS_HOST"),
token=os.environ.get("DATABRICKS_TOKEN"),
cluster_id=os.environ.get("DATABRICKS_CLUSTER_ID"),
)
spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()
return spark
4. Alternative Approach / 5. Unit Testing and CI Flows:
- Same issues as mentioned in 2 (needed module is part of the package we are testing)
- Mocking the module with ptyest-mock does not solve the issue as it still yields the ModuleNotFoundError. e.g.: mocker.patch("src.local_module", return_value=out_pd)
- Integration tests will still be an issue unless we deploy to a QC repo in databricks before we run our CI integration tests which is an odd CICD flow.