cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

ModuleNotFound when running DLT pipeline

hiryucodes
New Contributor III

My new DLT pipeline gives me a ModuleNotFound error when I try to request data from an API. For some more context, I develop in my local IDE and then deploy to databricks using asset bundles. The pipeline runs fine if I try to write a static dataframe instead of making the request to the API.

Here's my pipeline code:

 

@Dlt.table(
    name="my_table",
    table_properties={"quality": "bronze"},
)
def dlt_ingestion():
    api_key = os.getenv("API_KEY")
    params = build_query_params()

    spark.dataSource.register(MyDataSource)
    response_df = (
        spark.read.format("mydatasource").option("api_key", api_key).option("params", json.dumps(params)).load()
    )

    return response_df

 

 

 

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "mydatasource"

    def schema(self):
        return my_schema

    def reader(self, schema: StructType):
        return MyDataSourceReader(schema, self.options)


class MyDataSourceReader(DataSourceReader):
    def __init__(self, schema: StructType, options: dict):
        self.schema = schema
        self.options = options
        self.api_key = self.options.get("api_key")
        self.base_url = "https:/my/api/url"
        self.timeout = int(self.options.get("timeout", 600))

        json_params = self.options.get("params")
        if json_params:
            try:
                self.params = json.loads(json_params)
            except json.JSONDecodeError:
                raise ValueError("Invalid JSON format for params")
        else:
            self.params = {}

    def read(self, partition):
        headers = {"Accept": "application/json", "X-Api-Key": self.api_key}

        page = 1
        result = []
        next_page = True
        self.params["pageSize"] = 1000
        while next_page:
            params["page"] = page

            response = requests.get(self.base_url, headers=headers, params=self.params, timeout=self.timeout)
            response.raise_for_status()
            data = response.json()

            for record in data:
                result.append(record)

            if len(data) == self.params["pageSize"]:
                page += 1
            else:
                next_page = False

        for item in data:
            yield tuple(item.values())

    def partitions(self):
        from pyspark.sql.datasource import InputPartition

        return [InputPartition(0)]

 

The error I'm getting:

 

pyspark.errors.exceptions.captured.AnalysisException: [PYTHON_DATA_SOURCE_ERROR] Failed to create Python data source instance: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'src'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)

 

 

3 REPLIES 3

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @hiryucodes,

Ensure that the directory structure of your project is correctly set up. The module 'src' should be in a directory that is part of the Python path. For example, if your module is in a directory named 'src', the directory structure should look something like this: /Workspace/Repos/your_username/your_project/

    • ├── src/ │
    • ├── __init__.py │
    • ├── your_module.py
    • └── your_pipeline_notebook.py

You can also try to modify the sys.path within your DLT pipeline to include the directory where the 'src' module is located. This can be done by adding the following code at the beginning of your pipeline notebook:

 

import sys
sys.path.append('/Workspace/Repos/your_username/your_project/src')

 

Hello, thank you for the reply. I've verified both of those two points. The pipeline works if I just use regular python functions but gives the error if I use a custom Data Source or a UDF. What I'm guessing is that both of these use different Spark Contexts than what is setup at the start by the pipeline so my sys.path.append doesn't take affect anymore?

hiryucodes
New Contributor III

The only way I was able to make this work was to have the custom data source class code in the same file as the DLT pipeline. Like this:

class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "mydatasource"

    def schema(self):
        return my_schema

    def reader(self, schema: StructType):
        return MyDataSourceReader(schema, self.options)


class MyDataSourceReader(DataSourceReader):
    def __init__(self, schema: StructType, options: dict):
        self.schema = schema
        self.options = options
        self.api_key = self.options.get("api_key")
        self.base_url = "https:/my/api/url"
        self.timeout = int(self.options.get("timeout", 600))

        json_params = self.options.get("params")
        if json_params:
            try:
                self.params = json.loads(json_params)
            except json.JSONDecodeError:
                raise ValueError("Invalid JSON format for params")
        else:
            self.params = {}

    def read(self, partition):
        headers = {"Accept": "application/json", "X-Api-Key": self.api_key}

        page = 1
        result = []
        next_page = True
        self.params["pageSize"] = 1000
        while next_page:
            params["page"] = page

            response = requests.get(self.base_url, headers=headers, params=self.params, timeout=self.timeout)
            response.raise_for_status()
            data = response.json()

            for record in data:
                result.append(record)

            if len(data) == self.params["pageSize"]:
                page += 1
            else:
                next_page = False

        for item in data:
            yield tuple(item.values())

    def partitions(self):
        from pyspark.sql.datasource import InputPartition

        return [InputPartition(0)]

@Dlt.table(
    name="my_table",
    table_properties={"quality": "bronze"},
)
def dlt_ingestion():
    api_key = os.getenv("API_KEY")
    params = build_query_params()

    spark.dataSource.register(MyDataSource)
    response_df = (
        spark.read.format("mydatasource").option("api_key", api_key).option("params", json.dumps(params)).load()
    )

    return response_df

This is not practical at all as I also want to use the custom data source in other DLT pipelines, which I won't be able to unless I duplicate the code every time.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group