â07-05-2023 04:50 AM - edited â07-05-2023 07:50 AM
Hi
I have the problem that my "module" is not known when used in a user defined function. The precise message is posted below. I have a repo structure as follows:
analytics_pipelines
â âââ __init__.py
â âââ coordinate_transformation.py
â âââ data_quality_checks.py
â âââ pipeline.py
â âââ transformations.py
âââ delta_live_tables
â âââ configurations
â â âââ data_ingestion.json
â âââ data_ingestion.py
âââ dist
âââ notebooks
â âââ local_example.ipynb
â âââ testdata
â â âââ configuration.csv
â â âââ input.avro
â â âââ test.parquet
âââ poetry.lock
âââ pyproject.toml
âââ README.md
âââ tests
â âââ __init__.py
â âââ test_transformations.py
in delta_live_tables folder i got a notebook that is doing something like
import sys
sys.path.append('/Workspace/Repos/<user>/analytics-data-pipelines/analytics_pipelines')
import pipeline
config = pipeline.setup_config(mode, avro_raw_data)
pipeline.define_ingestion_pipeline(spark, config)
pipeline.define_summary_tables(spark, config)
in pipeline.define_ingestion_pipeline i define a bunch of delta live tables via the python api. I also import the transformations.py inside the pipeline.py for defining the neccessary data transformations.
import dlt
from transformations import *
from coordinate_transformation import apply_coordinate_transform
#.....
def define_ingestion_pipeline(spark, config):
....
@dlt.table(
comment='',
path = ...
)
def table_name():
data = dlt.read_stream("other")
return transform_data(data)
...
Everything works, except where i use a python user defined function in one of the transformations. The corresponding transformation looks similar:
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
def apply_coordinate_transform(data):
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
apparently the coordinate_transformation.py is not available, but why?
Error Message: File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length return self.loads(obj) File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads return cloudpickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'coordinate_transformation'
(Unrelated:
could somebody point out to me a way how to multiply a 3x3 matrices onto a Nx3 large dataframe ? (resulting in a Nx3 dataframe)
Regards and thanks
David
â12-05-2023 01:24 AM
I solved this by defining the applyInPandas transformation inside of the function that it's used in.
def apply_coordinate_transform(data):
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
â12-05-2023 02:52 AM
Hi
yes, I discovered three working possibilities:
libraries: - notebook: path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.pyâ
Regards
DAvid
â11-22-2023 02:54 AM
Hi David, I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?
â12-04-2023 01:51 PM
Hello David. Same as James : "I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?"
â12-05-2023 01:24 AM
I solved this by defining the applyInPandas transformation inside of the function that it's used in.
def apply_coordinate_transform(data):
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
â12-05-2023 02:52 AM
Hi
yes, I discovered three working possibilities:
libraries: - notebook: path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.pyâ
Regards
DAvid