โ11-20-2023 10:08 AM
databricks-connect==14.1.0
Related to other posts:
Using a local package in a Pandas UDF like:
import pandas as pd
import local_module
def udf_wrapper(df):
schema = "type string, test_name string"
def udf(pdf: pd.DataFrame) -> pd.DataFrame:
out = local_module.function(pdf)
return out
df.groupby("test_name").applyInPandas(udf, schema=schema)
df = spark.sql('some fun query')
final_df = udf_wrapper(df)
results in: ModuleNotFoundError: No module named 'local_module'
Running in a Databricks notebook works just fine.
Trying to add the local_module to the spark context via `spark.sparkContext.addPyFile(local_module.zip)` does not work as `sparkContext` is no longer available in spark from databricks-connect.
Is there a way around this? This seems to be a severe limitation that prevents proper unit test and CI flows for UDFs, a solution would be greatly appreciated.
โ11-21-2023 11:25 AM
Hi @Retired_mod, Thank you so much for the detailed reply!
TL;DR: How do you use `--py-files` with databricks-connect as you are not explicitly using `spark-submit`? databricks-connect: step-5-add-code
1. Understanding the Problem:
import pandas as pd
from src import local_module
def udf_wrapper(spark, df):
schema = "type string, test_name string"
def udf(pdf: pd.DataFrame) -> pd.DataFrame:
out = local_module.function(pdf)
return out
# This for loop is a slower non distributed way to do what `applyInPandas` is doing
new_df = pd.DataFrame()
for test_name in df.select("test_name").distinct().toPandas()["test_name"].tolist():
pdf = df.filter(df.test_name == test_name).toPandas()
new_out = make_test_plots(pdf)
new_df = pd.concat([new_df, new_out])
return spark.createDataFrame(new_df) # df.groupby("test_name").applyInPandas(udf, schema=schema)
df = spark.sql('some fun query')
final_df = udf_wrapper(spark, df)โโ
2. PYTHONPATH Environment Variable:
3. Worker Nodes and --py-files:
import io
import os
import zipfile
import pytest
def worker_node_context(spark_func):
# This was just being attempted so not fully formed
# Sadly does not work do to sparkContext not being available
def inner(*args, **kwargs):
spark = spark_func(*args, **kwargs)
# create zip file of src directory
path = os.getcwd()
if "tests" in path: # handle local vs databricks
os.ch dir("../")
src_zip = io.BytesIO()
with zipfile.ZipFile(src_zip, "w") as myzip:
for file in os.listdir("src"):
myzip.write(os.path.join("src", file), file)
# add local packages to spark context
spark.sparkContext.addPyFile(src_zip)
return spark
return inner
@pytest.fixture
@worker_node_context
def spark():
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
# If running in databricks
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
else:
# If running locally
from databricks.connect import DatabricksSession
from databricks.sdk.core import Config
# get spark session using Databricks SDK's Config class:
config = Config(
host=os.environ.get("DATABRICKS_HOST"),
token=os.environ.get("DATABRICKS_TOKEN"),
cluster_id=os.environ.get("DATABRICKS_CLUSTER_ID"),
)
spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()
return spark
4. Alternative Approach / 5. Unit Testing and CI Flows:
โ11-27-2023 12:34 PM
@Retired_mod Thank you for the response but none of this seems valid for current databricks-connect versions.
If I am missing something, could you provide a working example of your last response?
โ11-29-2023 05:55 PM
There is a way to do this!!
spark.addArtifact(src_zip_path, pyfile=True)
Some things of note:
import posixpath # this is a builtin
path2 = os.path
os.path = posixpath # need to patch on windows
spark.addArtifact(src_zip_path, pyfile=True)
os.path = path2 # put it back
They are aware of the issue so this may change in the near future.
Thanks to the databricks support team in helping me with the solution!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group