โ11-20-2023 10:08 AM
databricks-connect==14.1.0
Related to other posts:
Using a local package in a Pandas UDF like:
import pandas as pd
import local_module
def udf_wrapper(df):
schema = "type string, test_name string"
def udf(pdf: pd.DataFrame) -> pd.DataFrame:
out = local_module.function(pdf)
return out
df.groupby("test_name").applyInPandas(udf, schema=schema)
df = spark.sql('some fun query')
final_df = udf_wrapper(df)
results in: ModuleNotFoundError: No module named 'local_module'
Running in a Databricks notebook works just fine.
Trying to add the local_module to the spark context via `spark.sparkContext.addPyFile(local_module.zip)` does not work as `sparkContext` is no longer available in spark from databricks-connect.
Is there a way around this? This seems to be a severe limitation that prevents proper unit test and CI flows for UDFs, a solution would be greatly appreciated.
โ11-21-2023 01:51 AM
Hi @AFox , Letโs explore some potential solutions to address this limitation:
Understanding the Problem:
databricks-connect
, the driver program runs on your local machine, and the worker nodes execute tasks on the cluster.PYTHONPATH Environment Variable:
spark-submit
command (i.e., the driver node) needs access to the module files.PYTHONPATH
environment variable on the node where youโre submitting the job.export PYTHONPATH=$PYTHONPATH:/path/to/your/module
--py-files
.Worker Nodes and --py-files
:
--py-files
.--py-files
argument precedes your main Python file argument in the spark-submit
command:
./bin/spark-submit --py-files local_module.zip mycode.py
Alternative Approach:
PYTHONPATH
on the worker nodes is not feasible, consider an alternative approach:
__init__.py
file) and distribute it via a Python package manager (e.g., pip
).Unit Testing and CI Flows:
pytest
or unittest
to write and execute tests against your UDFs.Remember that the choice between driver and worker nodes depends on whether the module is needed during task execution. Adjust your approach based on your specific requirements.
If you encounter any further issues, feel free to seek additional assistance or provide feedback to the Databricks community for potential enhancements.
Happy coding! ๐
โ11-21-2023 11:25 AM
Hi @Kaniz_Fatma, Thank you so much for the detailed reply!
TL;DR: How do you use `--py-files` with databricks-connect as you are not explicitly using `spark-submit`? databricks-connect: step-5-add-code
1. Understanding the Problem:
import pandas as pd
from src import local_module
def udf_wrapper(spark, df):
schema = "type string, test_name string"
def udf(pdf: pd.DataFrame) -> pd.DataFrame:
out = local_module.function(pdf)
return out
# This for loop is a slower non distributed way to do what `applyInPandas` is doing
new_df = pd.DataFrame()
for test_name in df.select("test_name").distinct().toPandas()["test_name"].tolist():
pdf = df.filter(df.test_name == test_name).toPandas()
new_out = make_test_plots(pdf)
new_df = pd.concat([new_df, new_out])
return spark.createDataFrame(new_df) # df.groupby("test_name").applyInPandas(udf, schema=schema)
df = spark.sql('some fun query')
final_df = udf_wrapper(spark, df)โโ
2. PYTHONPATH Environment Variable:
3. Worker Nodes and --py-files:
import io
import os
import zipfile
import pytest
def worker_node_context(spark_func):
# This was just being attempted so not fully formed
# Sadly does not work do to sparkContext not being available
def inner(*args, **kwargs):
spark = spark_func(*args, **kwargs)
# create zip file of src directory
path = os.getcwd()
if "tests" in path: # handle local vs databricks
os.ch dir("../")
src_zip = io.BytesIO()
with zipfile.ZipFile(src_zip, "w") as myzip:
for file in os.listdir("src"):
myzip.write(os.path.join("src", file), file)
# add local packages to spark context
spark.sparkContext.addPyFile(src_zip)
return spark
return inner
@pytest.fixture
@worker_node_context
def spark():
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
# If running in databricks
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
else:
# If running locally
from databricks.connect import DatabricksSession
from databricks.sdk.core import Config
# get spark session using Databricks SDK's Config class:
config = Config(
host=os.environ.get("DATABRICKS_HOST"),
token=os.environ.get("DATABRICKS_TOKEN"),
cluster_id=os.environ.get("DATABRICKS_CLUSTER_ID"),
)
spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()
return spark
4. Alternative Approach / 5. Unit Testing and CI Flows:
โ11-22-2023 11:02 PM
Hi @AFox , Certainly! When using Databricks Connect without explicitly invoking spark-submit
, you can still manage Python dependencies using the --py-files
option. Hereโs how:
Databricks Connect and Python Dependencies:
--py-files
option to distribute additional Python files (such as libraries or modules) to the cluster.Steps to Use --py-files
with Databricks Connect:
.zip
file) containing the necessary dependencies (modules, libraries, etc.).pyarrow
or NumPy
).databricks-connect configure
--py-files
:
spark.conf.set("spark.submit.pyFiles", "/path/to/your/package.zip")
/path/to/your/package.zip
with the actual path to your Python package.Example:
my_dependencies.zip
containing necessary Python files.spark.conf.set("spark.submit.pyFiles", "/path/to/my_dependencies.zip")
Remember that this approach works seamlessly with Databricks Connect, allowing you to manage Python dependencies effectively without explicitly using spark-submit
. ๐๐
โ11-27-2023 12:34 PM
@Kaniz_Fatma Thank you for the response but none of this seems valid for current databricks-connect versions.
If I am missing something, could you provide a working example of your last response?
โ11-27-2023 10:25 PM
Hi @AFox , I apologize for any confusion. In Spark 3.0 and above, the SET
command does not work on SparkConf entries. This behaviour is by design. If you encounter an error like โCannot modify the value of a Spark config: spark.submit.pyFiles,โ hereโs how you can address it:
Remove SET Commands: Remove any SET
commands for SparkConf entries from your notebook. These commands wonโt work as expected in recent Spark versions.
Cluster-Level Configuration: Instead, set SparkConf values at the cluster level by entering them in the clusterโs Spark config. You can do this in the following ways:
Restart the Cluster: After making changes to the cluster-level Spark config, restart the cluster to apply the new settings.
Hereโs an example of how you can set a Spark configuration property programmatically using spark.conf.set
:
# Example: Set a custom value for spark.driver.host
spark.conf.set("spark.driver.host", "myhost")
Remember to replace "myhost"
with your desired value. This approach avoids modifying SparkConf entries directly in the notebook and adheres to the recommended practices for Spark configuration.
For more details, you can refer to the Databricks documentation on this topic.
โ11-29-2023 05:55 PM
There is a way to do this!!
spark.addArtifact(src_zip_path, pyfile=True)
Some things of note:
import posixpath # this is a builtin
path2 = os.path
os.path = posixpath # need to patch on windows
spark.addArtifact(src_zip_path, pyfile=True)
os.path = path2 # put it back
They are aware of the issue so this may change in the near future.
Thanks to the databricks support team in helping me with the solution!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group