cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

delta live table udf not known when defined in python module

david3
New Contributor III

Hi 

I have the problem that my "module" is not known when used in a user defined function. The precise message is posted below. I have a repo structure as follows:

 

 

analytics_pipelines
│   ├── __init__.py
│   ├── coordinate_transformation.py
│   ├── data_quality_checks.py
│   ├── pipeline.py
│   └── transformations.py
├── delta_live_tables
│   ├── configurations
│   │   └── data_ingestion.json
│   └── data_ingestion.py
├── dist
├── notebooks
│   ├── local_example.ipynb
│   ├── testdata
│   │   ├── configuration.csv
│   │   ├── input.avro
│   │   └── test.parquet
├── poetry.lock
├── pyproject.toml
├── README.md
├── tests
│   ├── __init__.py
│   └── test_transformations.py

 

 

in delta_live_tables folder i got a notebook that is doing something like

 

 

import sys
sys.path.append('/Workspace/Repos/<user>/analytics-data-pipelines/analytics_pipelines')

import pipeline

config = pipeline.setup_config(mode, avro_raw_data)
pipeline.define_ingestion_pipeline(spark, config)
pipeline.define_summary_tables(spark, config)

 

 

in pipeline.define_ingestion_pipeline i define a bunch of delta live tables via the python api. I also import the transformations.py inside the pipeline.py for defining the neccessary data transformations. 

 

 

import dlt

from transformations import *
from coordinate_transformation import apply_coordinate_transform

#.....

def define_ingestion_pipeline(spark, config):
    ....
    @dlt.table(
        comment='',
        path = ...
    )
    def table_name():
        data = dlt.read_stream("other")
        return transform_data(data)
    ...    

 

 

Everything works, except where i use a python user defined function in one of the transformations. The corresponding transformation looks similar: 

 

 

def coordinate_transform(group_keys, pdf) -> pd.DataFrame:

    trafo = get_coordinate_transformation(group_keys[0])
    ... do some pandas code here
    return pdf

def apply_coordinate_transform(data):
    ...
    schema = data.schema
    data = data.groupBy('serialnumber',...)\
               .applyInPandas(coordinate_transform, schema=schema)

    return data

 

 

apparently the coordinate_transformation.py is not available, but why? 

Error Message:  File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length return self.loads(obj) File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads return cloudpickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'coordinate_transformation'

(Unrelated:

could somebody point out to me a way how to multiply a 3x3 matrices onto a Nx3 large dataframe ? (resulting in a Nx3 dataframe)

Regards and thanks

David

 

2 ACCEPTED SOLUTIONS

Accepted Solutions

I solved this by defining the applyInPandas transformation inside of the function that it's used in. 

 


def apply_coordinate_transform(data):
    def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
    
        trafo = get_coordinate_transformation(group_keys[0])
        ... do some pandas code here
        return pdf
    ...
    schema = data.schema
    data = data.groupBy('serialnumber',...)\
               .applyInPandas(coordinate_transform, schema=schema)

    return data

 

 

View solution in original post

david3
New Contributor III

Hi

yes, I discovered three working possibilities:

  1. Define the pandas functions as inline function as pointed out above
  2. Define the pandas function in the same script that is imported as "library" in the dlt config ( 

    libraries:
    - notebook:
    path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.py​
  3. install your python library as whl on the cluster

Regards

DAvid

View solution in original post

4 REPLIES 4

JamesDallimore
New Contributor III

Hi David, I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?

Carlose
New Contributor II

Hello David. Same as James : "I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?"

I solved this by defining the applyInPandas transformation inside of the function that it's used in. 

 


def apply_coordinate_transform(data):
    def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
    
        trafo = get_coordinate_transformation(group_keys[0])
        ... do some pandas code here
        return pdf
    ...
    schema = data.schema
    data = data.groupBy('serialnumber',...)\
               .applyInPandas(coordinate_transform, schema=schema)

    return data

 

 

david3
New Contributor III

Hi

yes, I discovered three working possibilities:

  1. Define the pandas functions as inline function as pointed out above
  2. Define the pandas function in the same script that is imported as "library" in the dlt config ( 

    libraries:
    - notebook:
    path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.py​
  3. install your python library as whl on the cluster

Regards

DAvid

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.