cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

delta live table udf not known when defined in python module

david3
New Contributor III

Hi 

I have the problem that my "module" is not known when used in a user defined function. The precise message is posted below. I have a repo structure as follows:

 

 

analytics_pipelines
ā”‚   ā”œā”€ā”€ __init__.py
ā”‚   ā”œā”€ā”€ coordinate_transformation.py
ā”‚   ā”œā”€ā”€ data_quality_checks.py
ā”‚   ā”œā”€ā”€ pipeline.py
ā”‚   ā””ā”€ā”€ transformations.py
ā”œā”€ā”€ delta_live_tables
ā”‚   ā”œā”€ā”€ configurations
ā”‚   ā”‚   ā””ā”€ā”€ data_ingestion.json
ā”‚   ā””ā”€ā”€ data_ingestion.py
ā”œā”€ā”€ dist
ā”œā”€ā”€ notebooks
ā”‚   ā”œā”€ā”€ local_example.ipynb
ā”‚   ā”œā”€ā”€ testdata
ā”‚   ā”‚   ā”œā”€ā”€ configuration.csv
ā”‚   ā”‚   ā”œā”€ā”€ input.avro
ā”‚   ā”‚   ā””ā”€ā”€ test.parquet
ā”œā”€ā”€ poetry.lock
ā”œā”€ā”€ pyproject.toml
ā”œā”€ā”€ README.md
ā”œā”€ā”€ tests
ā”‚   ā”œā”€ā”€ __init__.py
ā”‚   ā””ā”€ā”€ test_transformations.py

 

 

in delta_live_tables folder i got a notebook that is doing something like

 

 

import sys
sys.path.append('/Workspace/Repos/<user>/analytics-data-pipelines/analytics_pipelines')

import pipeline

config = pipeline.setup_config(mode, avro_raw_data)
pipeline.define_ingestion_pipeline(spark, config)
pipeline.define_summary_tables(spark, config)

 

 

in pipeline.define_ingestion_pipeline i define a bunch of delta live tables via the python api. I also import the transformations.py inside the pipeline.py for defining the neccessary data transformations. 

 

 

import dlt

from transformations import *
from coordinate_transformation import apply_coordinate_transform

#.....

def define_ingestion_pipeline(spark, config):
    ....
    @dlt.table(
        comment='',
        path = ...
    )
    def table_name():
        data = dlt.read_stream("other")
        return transform_data(data)
    ...    

 

 

Everything works, except where i use a python user defined function in one of the transformations. The corresponding transformation looks similar: 

 

 

def coordinate_transform(group_keys, pdf) -> pd.DataFrame:

    trafo = get_coordinate_transformation(group_keys[0])
    ... do some pandas code here
    return pdf

def apply_coordinate_transform(data):
    ...
    schema = data.schema
    data = data.groupBy('serialnumber',...)\
               .applyInPandas(coordinate_transform, schema=schema)

    return data

 

 

apparently the coordinate_transformation.py is not available, but why? 

Error Message:  File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length return self.loads(obj) File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads return cloudpickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'coordinate_transformation'

(Unrelated:

could somebody point out to me a way how to multiply a 3x3 matrices onto a Nx3 large dataframe ? (resulting in a Nx3 dataframe)

Regards and thanks

David

 

2 ACCEPTED SOLUTIONS

Accepted Solutions

I solved this by defining the applyInPandas transformation inside of the function that it's used in. 

 


def apply_coordinate_transform(data):
    def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
    
        trafo = get_coordinate_transformation(group_keys[0])
        ... do some pandas code here
        return pdf
    ...
    schema = data.schema
    data = data.groupBy('serialnumber',...)\
               .applyInPandas(coordinate_transform, schema=schema)

    return data

 

 

View solution in original post

david3
New Contributor III

Hi

yes, I discovered three working possibilities:

  1. Define the pandas functions as inline function as pointed out above
  2. Define the pandas function in the same script that is imported as "library" in the dlt config ( 

    libraries:
    - notebook:
    path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.pyā€‹
  3. install your python library as whl on the cluster

Regards

DAvid

View solution in original post

4 REPLIES 4

JamesDallimore
New Contributor III

Hi David, I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?

Carlose
New Contributor II

Hello David. Same as James : "I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?"

I solved this by defining the applyInPandas transformation inside of the function that it's used in. 

 


def apply_coordinate_transform(data):
    def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
    
        trafo = get_coordinate_transformation(group_keys[0])
        ... do some pandas code here
        return pdf
    ...
    schema = data.schema
    data = data.groupBy('serialnumber',...)\
               .applyInPandas(coordinate_transform, schema=schema)

    return data

 

 

david3
New Contributor III

Hi

yes, I discovered three working possibilities:

  1. Define the pandas functions as inline function as pointed out above
  2. Define the pandas function in the same script that is imported as "library" in the dlt config ( 

    libraries:
    - notebook:
    path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.pyā€‹
  3. install your python library as whl on the cluster

Regards

DAvid

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonā€™t want to miss the chance to attend and share knowledge.

If there isnā€™t a group near you, start one and help create a community that brings people together.

Request a New Group