<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: ModuleNotFound when running DLT pipeline in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/129813#M48609</link>
    <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/181007"&gt;@AFH&lt;/a&gt; so in the end we ended up finding a "solution"&amp;nbsp; to the notebook issue. This issue occurs in "serverless". The only way to make it work for validation is by spinning up a compute and connecting your notebook to it.&lt;/P&gt;&lt;P&gt;In the context of DLT pipelines, we actually hit this issue as well. We tried to install our DAB via a initial "install" job, however that ended up with race conditions. We initially wanted to install the wheel via `libraries.whl` on the yml specification, but decided against it since that was a deprecated option.&lt;/P&gt;&lt;P&gt;After a while I managed to find&amp;nbsp;&lt;A href="https://docs.databricks.com/api/workspace/pipelines/create#environment" target="_blank"&gt;https://docs.databricks.com/api/workspace/pipelines/create#environment&lt;/A&gt;&amp;nbsp;. So ultimately our pipelines look like:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;resources:
  pipelines:
    my_pipeline_name:
        .....
        environment:
          dependencies:
             - '/path/to/where/dab/is/built/and/loaded/in/unity/catalog'&lt;/LI-CODE&gt;&lt;P&gt;This apparently works with wild cards, so for example i could specify `- '/my/path/my_cool_wheel_*_py3-none-any.whl'` and it managed to install it. After this our pipelines are using our custom structured streaming reliably.&lt;/P&gt;</description>
    <pubDate>Tue, 26 Aug 2025 12:58:47 GMT</pubDate>
    <dc:creator>dvlopr</dc:creator>
    <dc:date>2025-08-26T12:58:47Z</dc:date>
    <item>
      <title>ModuleNotFound when running DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/109334#M43278</link>
      <description>&lt;P&gt;My new DLT pipeline gives me a ModuleNotFound error when I try to request data from an API. For some more context, I develop in my local IDE and then deploy to databricks using asset bundles. The pipeline runs fine if I try to write a static dataframe instead of making the request to the API.&lt;/P&gt;&lt;P&gt;Here's my pipeline code:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.table(
    name="my_table",
    table_properties={"quality": "bronze"},
)
def dlt_ingestion():
    api_key = os.getenv("API_KEY")
    params = build_query_params()

    spark.dataSource.register(MyDataSource)
    response_df = (
        spark.read.format("mydatasource").option("api_key", api_key).option("params", json.dumps(params)).load()
    )

    return response_df&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "mydatasource"

    def schema(self):
        return my_schema

    def reader(self, schema: StructType):
        return MyDataSourceReader(schema, self.options)


class MyDataSourceReader(DataSourceReader):
    def __init__(self, schema: StructType, options: dict):
        self.schema = schema
        self.options = options
        self.api_key = self.options.get("api_key")
        self.base_url = "https:/my/api/url"
        self.timeout = int(self.options.get("timeout", 600))

        json_params = self.options.get("params")
        if json_params:
            try:
                self.params = json.loads(json_params)
            except json.JSONDecodeError:
                raise ValueError("Invalid JSON format for params")
        else:
            self.params = {}

    def read(self, partition):
        headers = {"Accept": "application/json", "X-Api-Key": self.api_key}

        page = 1
        result = []
        next_page = True
        self.params["pageSize"] = 1000
        while next_page:
            params["page"] = page

            response = requests.get(self.base_url, headers=headers, params=self.params, timeout=self.timeout)
            response.raise_for_status()
            data = response.json()

            for record in data:
                result.append(record)

            if len(data) == self.params["pageSize"]:
                page += 1
            else:
                next_page = False

        for item in data:
            yield tuple(item.values())

    def partitions(self):
        from pyspark.sql.datasource import InputPartition

        return [InputPartition(0)]&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;The error I'm getting:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;pyspark.errors.exceptions.captured.AnalysisException: [PYTHON_DATA_SOURCE_ERROR] Failed to create Python data source instance: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'src'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 07 Feb 2025 01:44:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/109334#M43278</guid>
      <dc:creator>hiryucodes</dc:creator>
      <dc:date>2025-02-07T01:44:55Z</dc:date>
    </item>
    <item>
      <title>Re: ModuleNotFound when running DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/109436#M43317</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/148184"&gt;@hiryucodes&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;Ensure that the directory structure of your project is correctly set up. The module 'src' should be in a directory that is part of the Python path. For example, if your module is in a directory named 'src', the directory structure should look something like this: &lt;CODE&gt;
/Workspace/Repos/your_username/your_project/
&lt;/CODE&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI style="list-style-type: none;"&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;&lt;CODE&gt;├── src/
│ &lt;/CODE&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;CODE&gt;  ├── __init__.py
│ &lt;/CODE&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;CODE&gt;  ├── your_module.py
&lt;/CODE&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;CODE&gt;└── your_pipeline_notebook.py&lt;/CODE&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;You can also try to modify the &lt;CODE&gt;sys.path&lt;/CODE&gt; within your DLT pipeline to include the directory where the 'src' module is located. This can be done by adding the following code at the beginning of your pipeline notebook:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;DIV class="gb5fhw2"&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python _1t7bu9hb hljs language-python gb5fhw3"&gt;&lt;SPAN class="hljs-keyword"&gt;import&lt;/SPAN&gt; sys
sys.path.append(&lt;SPAN class="hljs-string"&gt;'/Workspace/Repos/your_username/your_project/src'&lt;/SPAN&gt;)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;/DIV&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 07 Feb 2025 17:29:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/109436#M43317</guid>
      <dc:creator>Alberto_Umana</dc:creator>
      <dc:date>2025-02-07T17:29:33Z</dc:date>
    </item>
    <item>
      <title>Re: ModuleNotFound when running DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/109487#M43329</link>
      <description>&lt;P&gt;Hello, thank you for the reply. I've verified both of those two points. The pipeline works if I just use regular python functions but gives the error if I use a custom Data Source or a UDF. What I'm guessing is that both of these use different Spark Contexts than what is setup at the start by the pipeline so my sys.path.append doesn't take affect anymore?&lt;/P&gt;</description>
      <pubDate>Sat, 08 Feb 2025 12:46:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/109487#M43329</guid>
      <dc:creator>hiryucodes</dc:creator>
      <dc:date>2025-02-08T12:46:32Z</dc:date>
    </item>
    <item>
      <title>Re: ModuleNotFound when running DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/109592#M43349</link>
      <description>&lt;P&gt;The only way I was able to make this work was to have the custom data source class code in the same file as the DLT pipeline. Like this:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;class MyDataSource(DataSource):
    @classmethod
    def name(cls):
        return "mydatasource"

    def schema(self):
        return my_schema

    def reader(self, schema: StructType):
        return MyDataSourceReader(schema, self.options)


class MyDataSourceReader(DataSourceReader):
    def __init__(self, schema: StructType, options: dict):
        self.schema = schema
        self.options = options
        self.api_key = self.options.get("api_key")
        self.base_url = "https:/my/api/url"
        self.timeout = int(self.options.get("timeout", 600))

        json_params = self.options.get("params")
        if json_params:
            try:
                self.params = json.loads(json_params)
            except json.JSONDecodeError:
                raise ValueError("Invalid JSON format for params")
        else:
            self.params = {}

    def read(self, partition):
        headers = {"Accept": "application/json", "X-Api-Key": self.api_key}

        page = 1
        result = []
        next_page = True
        self.params["pageSize"] = 1000
        while next_page:
            params["page"] = page

            response = requests.get(self.base_url, headers=headers, params=self.params, timeout=self.timeout)
            response.raise_for_status()
            data = response.json()

            for record in data:
                result.append(record)

            if len(data) == self.params["pageSize"]:
                page += 1
            else:
                next_page = False

        for item in data:
            yield tuple(item.values())

    def partitions(self):
        from pyspark.sql.datasource import InputPartition

        return [InputPartition(0)]

@Dlt.table(
    name="my_table",
    table_properties={"quality": "bronze"},
)
def dlt_ingestion():
    api_key = os.getenv("API_KEY")
    params = build_query_params()

    spark.dataSource.register(MyDataSource)
    response_df = (
        spark.read.format("mydatasource").option("api_key", api_key).option("params", json.dumps(params)).load()
    )

    return response_df&lt;/LI-CODE&gt;&lt;P&gt;This is not practical at all as I also want to use the custom data source in other DLT pipelines, which I won't be able to unless I duplicate the code every time.&lt;/P&gt;</description>
      <pubDate>Mon, 10 Feb 2025 10:17:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/109592#M43349</guid>
      <dc:creator>hiryucodes</dc:creator>
      <dc:date>2025-02-10T10:17:22Z</dc:date>
    </item>
    <item>
      <title>Re: ModuleNotFound when running DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/125770#M47524</link>
      <description>&lt;LI-CODE lang="markup"&gt;Failed to create Python data source instance: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'src'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/worker/create_data_source.py", line 82, in main
    data_source_cls = read_command(pickleSer, infile)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/worker_util.py", line 71, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 196, in _read_with_length
    raise SerializationError("Caused by " + traceback.format_exc())
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/LI-CODE&gt;&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/148184"&gt;@hiryucodes&lt;/a&gt;&amp;nbsp;were you ever able to come up with a solution to this issue? I've been writing a custom data source/reader and trying to validate within a notebook. I get the same issue when i try to load in the source by importing from the asset bundle.&amp;nbsp;&lt;/P&gt;&lt;P&gt;That said, i can make it work by defining everything in place. Not sure if the issue is spark context, and more an issue of the cloudpickle that is applied to the class (and then promptly reloaded when the task is executed).&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sun, 20 Jul 2025 13:41:27 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/125770#M47524</guid>
      <dc:creator>dvlopr</dc:creator>
      <dc:date>2025-07-20T13:41:27Z</dc:date>
    </item>
    <item>
      <title>Re: ModuleNotFound when running DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/129727#M48591</link>
      <description>&lt;P&gt;Same problem here!&lt;/P&gt;</description>
      <pubDate>Tue, 26 Aug 2025 03:16:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/129727#M48591</guid>
      <dc:creator>AFH</dc:creator>
      <dc:date>2025-08-26T03:16:55Z</dc:date>
    </item>
    <item>
      <title>Re: ModuleNotFound when running DLT pipeline</title>
      <link>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/129813#M48609</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/181007"&gt;@AFH&lt;/a&gt; so in the end we ended up finding a "solution"&amp;nbsp; to the notebook issue. This issue occurs in "serverless". The only way to make it work for validation is by spinning up a compute and connecting your notebook to it.&lt;/P&gt;&lt;P&gt;In the context of DLT pipelines, we actually hit this issue as well. We tried to install our DAB via a initial "install" job, however that ended up with race conditions. We initially wanted to install the wheel via `libraries.whl` on the yml specification, but decided against it since that was a deprecated option.&lt;/P&gt;&lt;P&gt;After a while I managed to find&amp;nbsp;&lt;A href="https://docs.databricks.com/api/workspace/pipelines/create#environment" target="_blank"&gt;https://docs.databricks.com/api/workspace/pipelines/create#environment&lt;/A&gt;&amp;nbsp;. So ultimately our pipelines look like:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;resources:
  pipelines:
    my_pipeline_name:
        .....
        environment:
          dependencies:
             - '/path/to/where/dab/is/built/and/loaded/in/unity/catalog'&lt;/LI-CODE&gt;&lt;P&gt;This apparently works with wild cards, so for example i could specify `- '/my/path/my_cool_wheel_*_py3-none-any.whl'` and it managed to install it. After this our pipelines are using our custom structured streaming reliably.&lt;/P&gt;</description>
      <pubDate>Tue, 26 Aug 2025 12:58:47 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/modulenotfound-when-running-dlt-pipeline/m-p/129813#M48609</guid>
      <dc:creator>dvlopr</dc:creator>
      <dc:date>2025-08-26T12:58:47Z</dc:date>
    </item>
  </channel>
</rss>

