07-05-2023 04:50 AM - edited 07-05-2023 07:50 AM
Hi
I have the problem that my "module" is not known when used in a user defined function. The precise message is posted below. I have a repo structure as follows:
analytics_pipelines
│ ├── __init__.py
│ ├── coordinate_transformation.py
│ ├── data_quality_checks.py
│ ├── pipeline.py
│ └── transformations.py
├── delta_live_tables
│ ├── configurations
│ │ └── data_ingestion.json
│ └── data_ingestion.py
├── dist
├── notebooks
│ ├── local_example.ipynb
│ ├── testdata
│ │ ├── configuration.csv
│ │ ├── input.avro
│ │ └── test.parquet
├── poetry.lock
├── pyproject.toml
├── README.md
├── tests
│ ├── __init__.py
│ └── test_transformations.py
in delta_live_tables folder i got a notebook that is doing something like
import sys
sys.path.append('/Workspace/Repos/<user>/analytics-data-pipelines/analytics_pipelines')
import pipeline
config = pipeline.setup_config(mode, avro_raw_data)
pipeline.define_ingestion_pipeline(spark, config)
pipeline.define_summary_tables(spark, config)
in pipeline.define_ingestion_pipeline i define a bunch of delta live tables via the python api. I also import the transformations.py inside the pipeline.py for defining the neccessary data transformations.
import dlt
from transformations import *
from coordinate_transformation import apply_coordinate_transform
#.....
def define_ingestion_pipeline(spark, config):
....
@dlt.table(
comment='',
path = ...
)
def table_name():
data = dlt.read_stream("other")
return transform_data(data)
...
Everything works, except where i use a python user defined function in one of the transformations. The corresponding transformation looks similar:
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
def apply_coordinate_transform(data):
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
apparently the coordinate_transformation.py is not available, but why?
Error Message: File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length return self.loads(obj) File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads return cloudpickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'coordinate_transformation'
(Unrelated:
could somebody point out to me a way how to multiply a 3x3 matrices onto a Nx3 large dataframe ? (resulting in a Nx3 dataframe)
Regards and thanks
David
12-05-2023 01:24 AM
I solved this by defining the applyInPandas transformation inside of the function that it's used in.
def apply_coordinate_transform(data):
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
12-05-2023 02:52 AM
Hi
yes, I discovered three working possibilities:
libraries: - notebook: path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.py
Regards
DAvid
11-22-2023 02:54 AM
Hi David, I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?
12-04-2023 01:51 PM
Hello David. Same as James : "I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?"
12-05-2023 01:24 AM
I solved this by defining the applyInPandas transformation inside of the function that it's used in.
def apply_coordinate_transform(data):
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
12-05-2023 02:52 AM
Hi
yes, I discovered three working possibilities:
libraries: - notebook: path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.py
Regards
DAvid
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group