cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

ModuleNotFound when running DLT pipeline

hiryucodes
New Contributor III

My new DLT pipeline gives me a ModuleNotFound error when I try to request data from an API. For some more context, I develop in my local IDE and then deploy to databricks using asset bundles. The pipeline runs fine if I try to write a static dataframe instead of making the request to the API.

Here's my pipeline code:

 

@Dlt.table(
    name="my_table",
    table_properties={"quality": "bronze"},
)
def dlt_ingestion():
    api_key = os.getenv("API_KEY")
    params = build_query_params()

    spark.dataSource.register(MyDataSource)
    response_df = (
        spark.read.format("mydatasource").option("api_key", api_key).option("params", json.dumps(params)).load()
    )

    return response_df

 

 

 

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "mydatasource"

    def schema(self):
        return my_schema

    def reader(self, schema: StructType):
        return MyDataSourceReader(schema, self.options)


class MyDataSourceReader(DataSourceReader):
    def __init__(self, schema: StructType, options: dict):
        self.schema = schema
        self.options = options
        self.api_key = self.options.get("api_key")
        self.base_url = "https:/my/api/url"
        self.timeout = int(self.options.get("timeout", 600))

        json_params = self.options.get("params")
        if json_params:
            try:
                self.params = json.loads(json_params)
            except json.JSONDecodeError:
                raise ValueError("Invalid JSON format for params")
        else:
            self.params = {}

    def read(self, partition):
        headers = {"Accept": "application/json", "X-Api-Key": self.api_key}

        page = 1
        result = []
        next_page = True
        self.params["pageSize"] = 1000
        while next_page:
            params["page"] = page

            response = requests.get(self.base_url, headers=headers, params=self.params, timeout=self.timeout)
            response.raise_for_status()
            data = response.json()

            for record in data:
                result.append(record)

            if len(data) == self.params["pageSize"]:
                page += 1
            else:
                next_page = False

        for item in data:
            yield tuple(item.values())

    def partitions(self):
        from pyspark.sql.datasource import InputPartition

        return [InputPartition(0)]

 

The error I'm getting:

 

pyspark.errors.exceptions.captured.AnalysisException: [PYTHON_DATA_SOURCE_ERROR] Failed to create Python data source instance: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'src'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)

 

 

6 REPLIES 6

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @hiryucodes,

Ensure that the directory structure of your project is correctly set up. The module 'src' should be in a directory that is part of the Python path. For example, if your module is in a directory named 'src', the directory structure should look something like this: /Workspace/Repos/your_username/your_project/

    • ├── src/ │
    • ├── __init__.py │
    • ├── your_module.py
    • └── your_pipeline_notebook.py

You can also try to modify the sys.path within your DLT pipeline to include the directory where the 'src' module is located. This can be done by adding the following code at the beginning of your pipeline notebook:

 

import sys
sys.path.append('/Workspace/Repos/your_username/your_project/src')

 

Hello, thank you for the reply. I've verified both of those two points. The pipeline works if I just use regular python functions but gives the error if I use a custom Data Source or a UDF. What I'm guessing is that both of these use different Spark Contexts than what is setup at the start by the pipeline so my sys.path.append doesn't take affect anymore?

hiryucodes
New Contributor III

The only way I was able to make this work was to have the custom data source class code in the same file as the DLT pipeline. Like this:

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "mydatasource"

    def schema(self):
        return my_schema

    def reader(self, schema: StructType):
        return MyDataSourceReader(schema, self.options)


class MyDataSourceReader(DataSourceReader):
    def __init__(self, schema: StructType, options: dict):
        self.schema = schema
        self.options = options
        self.api_key = self.options.get("api_key")
        self.base_url = "https:/my/api/url"
        self.timeout = int(self.options.get("timeout", 600))

        json_params = self.options.get("params")
        if json_params:
            try:
                self.params = json.loads(json_params)
            except json.JSONDecodeError:
                raise ValueError("Invalid JSON format for params")
        else:
            self.params = {}

    def read(self, partition):
        headers = {"Accept": "application/json", "X-Api-Key": self.api_key}

        page = 1
        result = []
        next_page = True
        self.params["pageSize"] = 1000
        while next_page:
            params["page"] = page

            response = requests.get(self.base_url, headers=headers, params=self.params, timeout=self.timeout)
            response.raise_for_status()
            data = response.json()

            for record in data:
                result.append(record)

            if len(data) == self.params["pageSize"]:
                page += 1
            else:
                next_page = False

        for item in data:
            yield tuple(item.values())

    def partitions(self):
        from pyspark.sql.datasource import InputPartition

        return [InputPartition(0)]

@Dlt.table(
    name="my_table",
    table_properties={"quality": "bronze"},
)
def dlt_ingestion():
    api_key = os.getenv("API_KEY")
    params = build_query_params()

    spark.dataSource.register(MyDataSource)
    response_df = (
        spark.read.format("mydatasource").option("api_key", api_key).option("params", json.dumps(params)).load()
    )

    return response_df

This is not practical at all as I also want to use the custom data source in other DLT pipelines, which I won't be able to unless I duplicate the code every time.

dvlopr
New Contributor III
Failed to create Python data source instance: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'src'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/worker/create_data_source.py", line 82, in main
    data_source_cls = read_command(pickleSer, infile)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/worker_util.py", line 71, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 196, in _read_with_length
    raise SerializationError("Caused by " + traceback.format_exc())
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

@hiryucodes were you ever able to come up with a solution to this issue? I've been writing a custom data source/reader and trying to validate within a notebook. I get the same issue when i try to load in the source by importing from the asset bundle. 

That said, i can make it work by defining everything in place. Not sure if the issue is spark context, and more an issue of the cloudpickle that is applied to the class (and then promptly reloaded when the task is executed).

 

 

AFH
New Contributor II

Same problem here!

dvlopr
New Contributor III

@AFH so in the end we ended up finding a "solution"  to the notebook issue. This issue occurs in "serverless". The only way to make it work for validation is by spinning up a compute and connecting your notebook to it.

In the context of DLT pipelines, we actually hit this issue as well. We tried to install our DAB via a initial "install" job, however that ended up with race conditions. We initially wanted to install the wheel via `libraries.whl` on the yml specification, but decided against it since that was a deprecated option.

After a while I managed to find https://docs.databricks.com/api/workspace/pipelines/create#environment . So ultimately our pipelines look like:

resources:
  pipelines:
    my_pipeline_name:
        .....
        environment:
          dependencies:
             - '/path/to/where/dab/is/built/and/loaded/in/unity/catalog'

This apparently works with wild cards, so for example i could specify `- '/my/path/my_cool_wheel_*_py3-none-any.whl'` and it managed to install it. After this our pipelines are using our custom structured streaming reliably.