cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT pipelines - sporadic ModuleNotFoundError

jtrousdale-lyb
New Contributor III

When we run DLT pipelines (which we deploy via DABs), we get a sporadic issue when attempting to install our bundle's wheel file.

First, in every DLT pipeline, we as a first step a script that looks like the following

import subprocess as sp
from importlib.util import find_spec as importlib_find_spec
from pathlib import Path

from databricks.sdk.runtime import dbutils, spark

if not importlib_find_spec('my_package'):
    artifact_path = spark.conf.get('bundle.artifactPath')

    if not artifact_path:
        msg = 'Bundle artifact path not found in Spark configuration.'
        raise ValueError(msg)

    wheel_path = next(
        (Path(artifact_path) / '.internal').glob(
            'my_package-*-py3-none-any.whl',
        ),
    )
    sp.check_call(  # noqa: S603
        [  # noqa: S607
            'pip',
            'install',
            str(wheel_path),
        ],
    )

    dbutils.library.restartPython()

Where we pass bundle.artifactPath in from the bundle in the pipeline options.

Some large percentage of the time, this works perfectly. From this point, we can run code which has available both our package (here called "my_package") as well as the 3rd party dependencies that are implied onto the .whl file by our project dependencies in the pyproject.toml.

However, every now and then, during the DLT pipeline "Initializing" stage, this will fail, with an error like the following

pyspark.errors.exceptions.captured.AnalysisException: Traceback (most recent call last):
  File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-6584cd7b-1c47-4509-891d-918d38e18089/lib/python3.12/site-packages/my_package/base_pipeline_config.py", line 293, in _fcn
    return source.load()
           ^^^^^^^^^^^^^

pyspark.errors.exceptions.captured.AnalysisException: [PYTHON_DATA_SOURCE_ERROR] Failed to create Python data source instance: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'my_package'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'my_package'

 SQLSTATE: 38000

Usually this ends up working after a re-run, though sometimes it can take several re-runs before it finally succeeds. And then sometimes it will work the first time several times in a row.

We'd really like to resolve this in a way that we can continue installing our wheel source code along with its external dependencies without this flakiness necessitating so many retries.

We thought about also trying to update sys.path, but this doesn't handle installing 3rd party dependencies. It seems like somehow the Python runtime gets out of sync.

1 ACCEPTED SOLUTION

Accepted Solutions

In the end, the fix was quite simple. Recently, there is a new field on pipelines to specify details of the DLT environment.

Note the environment.dependencies subfield and the use of "${workspace.artifact_path}/.internal/my_package-*-py3-none-any.whl" to depend on the dynamic-versioned bundle wheel file. This works perfectly, and runs before any of the spark cluster nodes start to process, thereby avoiding the race condition of the previous solution.

 

resources:
  pipelines:
    my_pipeline:
      name: my_pipeline
      catalog: ${var.bronze_catalog}
      schema: ${resources.schemas.bronze.name}
      configuration:
        bronze_catalog: ${var.bronze_catalog}
        silver_catalog: ${var.silver_catalog}
        gold_catalog: ${var.gold_catalog}
        bronze_schema: ${resources.schemas.bronze.name}
        silver_schema: ${resources.schemas.silver.name}
        gold_schema: ${resources.schemas.gold.name}
      serverless: true
      photon: true
      continuous: false
      environment:
        dependencies:
          - "${workspace.artifact_path}/.internal/my_package-*-py3-none-any.whl"
      libraries:
        - file:
            path: ../../src/my_package/pipelines/p10_my_package_streaming_pipeline.py

      permissions:
        - service_principal_name: ${var.service_principal_name}
          level: CAN_MANAGE

 Here's the documentation: https://docs.databricks.com/api/workspace/pipelines/create#environment

One caveat: It can't handle spaces in the artifact path currently.

View solution in original post

6 REPLIES 6

WiliamRosa
New Contributor III

If you're encountering intermittent ModuleNotFoundError when your DLT pipeline tries to install your asset bundle’s wheel file, this typically points to inconsistencies in how your dependencies are packaged or where they’re being deployed. Common culprits include missing dependencies in your setup.py, improper wheel creation, or failure to include the package in the pipeline deployment context. To resolve this, ensure your wheel includes all required modules, verify that the wheel is properly deployed using dbx or pipeline settings, and test that your dependencies can be imported within the DLT pipeline's runtime. If issues persist, consider mocking imports during development to validate syntax and package structure before running a full pipeline deployment.

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

@WiliamRosa Unfortunately, none of that is relevant. The packaging does not change between runs where it works and does not work, and I'm confident the packaging is correct.

Moreover, if you actually read the post more carefully, you'll see that it's the package itself (which I've called "my_package" for demonstration purposes), not dependencies, which it fails to find - even though, to get to the point where the error occurs, it must have found it (otherwise it could not find the .load() method it is calling, which is part of the module code which is packed into the wheel).

It seems something strange and erratic related to the Spark structured streaming source being dispatched into the Spark cluster.

In fact, this is an important detail I failed to mention - we are using a Spark structured streaming class/implementation (extending SimpleDataSourceStreamReader).

Hi @jtrousdale-lyb ,

To me it looks like race condition issue. That would explain why sometimes it works and sometimes it's not. One question, is there specific reason for this kind of installation of wheels? DAB supported installing wheels out of the box. 

resources:
  jobs:
    my_job:
      # ...
      tasks:
        - task_key: my_task
          # ...
          libraries:
            - whl: ./my-wheel-0.1.0.whl
            - whl: /Workspace/Shared/Libraries/my-wheel-0.0.1-py3-none-any.whl
            - whl: /Volumes/main/default/my-volume/my-wheel-0.1.0.whl

@szymon_dybczak When we run jobs, we use wheel tasks and never hit this issue.

This is specifically about DLT pipelines, as the title states. With DLT pipelines, you can only specify files or notebook dependencies

Here's an example of the pipeline

 

resources:
  pipelines:
    my_pipeline:
      name: my_pipeline
      catalog: ${var.bronze_catalog}
      schema: ${resources.schemas.bronze.name}
      configuration:
        static_config_object: "my_package.schema.my_pipeline:PIPELINE_STATIC_CONFIG"
        bronze_catalog: ${var.bronze_catalog}
        silver_catalog: ${var.silver_catalog}
        gold_catalog: ${var.gold_catalog}
        bronze_schema: ${resources.schemas.bronze.name}
        silver_schema: ${resources.schemas.silver.name}
        gold_schema: ${resources.schemas.gold.name}
        bundle.artifactPath: "${workspace.artifact_path}"
        streaming_initial_datetime: ""
      serverless: true
      photon: true
      continuous: false
      libraries:
        - file:
            path: ../../src/my_package/pipelines/p00_install_wheel.py
        - file:
            path: ../../src/my_package/pipelines/p10_my_package_streaming_pipeline.py

      permissions:
        - service_principal_name: ${var.service_principal_name}
          level: CAN_MANAGE
        - group_name: MY_GROUP
          level: CAN_RUN
        - group_name: MY_OTHER_GROUP
          level: CAN_RUN

We do it this way so that we can have one consistent way of packaging our code (the DAB wheel artifact).

In the end, the fix was quite simple. Recently, there is a new field on pipelines to specify details of the DLT environment.

Note the environment.dependencies subfield and the use of "${workspace.artifact_path}/.internal/my_package-*-py3-none-any.whl" to depend on the dynamic-versioned bundle wheel file. This works perfectly, and runs before any of the spark cluster nodes start to process, thereby avoiding the race condition of the previous solution.

 

resources:
  pipelines:
    my_pipeline:
      name: my_pipeline
      catalog: ${var.bronze_catalog}
      schema: ${resources.schemas.bronze.name}
      configuration:
        bronze_catalog: ${var.bronze_catalog}
        silver_catalog: ${var.silver_catalog}
        gold_catalog: ${var.gold_catalog}
        bronze_schema: ${resources.schemas.bronze.name}
        silver_schema: ${resources.schemas.silver.name}
        gold_schema: ${resources.schemas.gold.name}
      serverless: true
      photon: true
      continuous: false
      environment:
        dependencies:
          - "${workspace.artifact_path}/.internal/my_package-*-py3-none-any.whl"
      libraries:
        - file:
            path: ../../src/my_package/pipelines/p10_my_package_streaming_pipeline.py

      permissions:
        - service_principal_name: ${var.service_principal_name}
          level: CAN_MANAGE

 Here's the documentation: https://docs.databricks.com/api/workspace/pipelines/create#environment

One caveat: It can't handle spaces in the artifact path currently.

Hi @jtrousdale-lyb ,

Great, thanks for sharing solution with us!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now