cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

ModuleNotFound error when using transformWithStateInPandas via a class defined outside the notebook

VaDim
New Contributor III

As per Databricks documentation when I define the class that extends `StatefulProcessor` in a Notebook everything works ok however, execution fails with ModuleNotFound error as soon as the class definition is moved to a file (module) of it's own in a .py file outside of the notebook.

e.g.

Say I have the class in `/Workspace/python/module1/processor.py`

 

class Processor(StatefulProcessor):
...

and the notebook in `/Workspace/notebooks/notebook1.py`

import sys

sys.path.append(os.path.abspath("../python/"))

...

from module1.processor import Processor

df = df.groupBy("col1").transformWithStateInPandas(
statefulProcessor=Processor(),
outputStructType="...",
outputMode="append",
timeMode="ProcessingTime",
)
...

on execution it fails with:

STREAMING_PYTHON_RUNNER_INITIALIZATION_FAILURE
...
    return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'module1'

Environment: DataBricks Runtime 16.4

While searching for answers found this un-answered thread that sounds similar but related to applyInPandasWithState.

I tried:

  • different cluster access modes: standard, shared
  • pip install-ing the python files bundled as a wheel
1 ACCEPTED SOLUTION

Accepted Solutions

VaDim
New Contributor III

This is no longer an issue; it must be some patch version of DBX Runtime 16.4 fixed it and it works now without doing any changes to original code.

Thanks.

View solution in original post

2 REPLIES 2

stbjelcevic
Databricks Employee
Databricks Employee

Hi @VaDim ,

Thanks for the detailed context — you’ve run into a common gotcha with how Python code is serialized and executed for stateful streaming on Databricks.

Your sys.path.append only modifies the Python path on the driver node, but transformWithStateInPandas (like UDFs) executes its code on the worker nodes.

When Spark serializes your Processor object to send to the workers, it uses cloudpickle. When the workers try to deserialize it, they fail with ModuleNotFoundError: No module named 'module1' because that Python file doesn't exist on their file system or in their PYTHONPATH.

There are a couple of potential solutions here, one being slightly more involved than the other:

  1. Install the code as a wheel file (recommended best practice)
    I know you mentioned you tried this, but the way it's installed is important here. Running %pip install in a notebook cell is not enough, as that often only installs on the driver or in the notebook's isolated environment. You must install your package as a Cluster Library or a Job Library so that it is distributed and installed on all worker nodes before your code runs.

    Steps: create a wheel file, upload to DBFS or UC Volume, then install it on your cluster. (source)

  2. Use spark.sparkContext.addPyFile()
    This is a "lighter" solution if you don't want to build a full wheel file. This command tells Spark to ship your Python file to every worker.

    Make sure your module file is accessible, for example, by uploading it to DBFS or using a Workspace path.In your notebook, before you define the streaming query, add the file to the SparkContext. Note: You must use the full, absolute path. For Workspace files, prepend /Workspace/
    (source 1) (source 2)

VaDim
New Contributor III

This is no longer an issue; it must be some patch version of DBX Runtime 16.4 fixed it and it works now without doing any changes to original code.

Thanks.