cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Limitations with UDFs wrapping modules imported via Repos files?

Tsar
New Contributor III

We have been importing custom module wheel files from our AzDevOps repository. We are pushing to use the Databricks Repos arbitrary files to simplify this but it is breaking our spark UDF that wraps one of the functions in the library with a ModuleNotFoundError. Is this a limitation with using Repos files/notebook scoped modules, or are we missing a step?

UPDATE: I implied this in the code comments, but probably need to state explicitly that the import from Repos files works. And, as it turns out, calling the UDF directly works. But this does not work: df = df.withColumn(F.col("col1"), normalize_stringUDF(F.col("col1")))

# Add the Repos files to the syspath
import sys
sys.path.append("/Workspace/Repos/refined_pipeline_library/refined_pipeline")
 
# import the function from Repos
from refined_pipeline.data_utils.data_utils import normalize_string
 
# define the UDF
normalize_stringUDF = udf(lambda col_name: normalize_string(col_name))
 
# this works always
print(normalize_string("wjfhw//ef efuehf fheu ehe *&*H"))
 
# this fails when importing the module from Repos, but works when installing the module's wheel on the cluster
df = df.withColumn(F.col("col1"), normalize_stringUDF(F.col("col1")))

The error is:

PythonException: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 4 times, most recent failure: Lost task 0.3 in stage 38.0 (TID 71) (10.16.12.16 executor 0): org.apache.spark.api.python.PythonException: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 466, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'refined_pipeline''. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 466, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'refined_pipeline'

12 REPLIES 12

AmanSehgal
Honored Contributor III

Importing python modules from a repo in the notebook is in public review.

You can ask Databricks Account manager to enable it in your workspace.

@Kaniz Fatma​ can you help @Tim Kracht​  with this?

Kaniz
Community Manager
Community Manager

Hi @Tim Kracht​ and @Aman Sehgal​ , I'm sure I can help you with this. Let me get back to you.

Kaniz
Community Manager
Community Manager

Hi @Aman Sehgal​  and @Tim Kracht​ , This shall be available very soon.

Hubert-Dudek
Esteemed Contributor III

Actually for me is working and I even don't use sys.path.append but I use full path in from

from refined_pipeline_library.refined_pipeline.data_utils.data_utils import normalize_string

than in folder data_utils I would put file data_utils.py with class normalize_string,

additionally I always put __init__.py empty file,

and of course "files in repos" need to be enabled in admin console 🙂

Tsar
New Contributor III

@Aman Sehgal​  @Kaniz Fatma​  I have "files in repos" enabled. As I said, it successfully imports the module from Repos, but does not work when I call a function from that module in a UDF.

Tsar
New Contributor III

@Hubert Dudek​ Thanks for the info. You got it to work in a UDF?

Also facing the same issue. Did you ever figure this out?

Please see my answer below

Scott_B
New Contributor III

If your notebook is in the same Repo as the module, this should work without any modifications to the sys path.

If your notebook is not in the same Repo as the module, you may need to ensure that the sys path is correct on all nodes in your cluster that need the module. For example, this code should work for you:

# Create a wrapper function around my module that updates the sys path
import sys
def my_wrapper_function(x):
    sys.path.append("/Workspace/Repos/user_name/repo_name")
    from repo_name import lib_function
    return lib_function(x)
 
# Define the UDF
my_udf = udf(lambda col_name: my_wrapper_function(col_name))
 
# This should work now
df = df.withColumn(F.col("col1"), my_udf(F.col("col1")))

Hubert-Dudek
Esteemed Contributor III

It is also essential which runtime version is used. Generally, it works better in 11.x I realized (there is even a version to choose of files in the repo in the administrator panel).

Other improvements are coming to that functionality in the following runtimes.

BioData41
New Contributor III

@Kaniz Fatma​ , or anyone at Databricks, why does this have to be so 'hacky' to import from a different repo in the Workspace on all the executors? Is it possible for the root of the Workspace to be on the python path by default, rather than only the repo of the running Notebook? Is that part of the product roadmap, or is this not a practical option for some reason?

I realize that creating a wheels build of a package and installing it on all the nodes would work, but I think our system engineers want to avoid installation of custom packages, so we were looking at the Repos as a potential way to use custom packages that can be reused across many notebooks, with those notebooks potentially being in different Repos, so nesting the custom package within a notebook Repo or all of those notebook Repos would not be practical.

But actually, I can't even get this suggestion by @Scott.B (Customer)​ to work.  The executors on the worker nodes just can't seem to find the package.

Just like the OP, I can import the package fine except for when I wrap it in (in my case) a call to DataFrame.applyInPandas(), rather than wrapping in a UDF.

Scott_B
New Contributor III

As of DBR 13.0, additions to sys.path on the driver node that start with /Workspace are propagated to worker nodes, so the particular use case described by OP should work out of the box on DBR 13+.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.