- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-05-2023 04:50 AM - edited 07-05-2023 07:50 AM
Hi
I have the problem that my "module" is not known when used in a user defined function. The precise message is posted below. I have a repo structure as follows:
analytics_pipelines
│ ├── __init__.py
│ ├── coordinate_transformation.py
│ ├── data_quality_checks.py
│ ├── pipeline.py
│ └── transformations.py
├── delta_live_tables
│ ├── configurations
│ │ └── data_ingestion.json
│ └── data_ingestion.py
├── dist
├── notebooks
│ ├── local_example.ipynb
│ ├── testdata
│ │ ├── configuration.csv
│ │ ├── input.avro
│ │ └── test.parquet
├── poetry.lock
├── pyproject.toml
├── README.md
├── tests
│ ├── __init__.py
│ └── test_transformations.py
in delta_live_tables folder i got a notebook that is doing something like
import sys
sys.path.append('/Workspace/Repos/<user>/analytics-data-pipelines/analytics_pipelines')
import pipeline
config = pipeline.setup_config(mode, avro_raw_data)
pipeline.define_ingestion_pipeline(spark, config)
pipeline.define_summary_tables(spark, config)
in pipeline.define_ingestion_pipeline i define a bunch of delta live tables via the python api. I also import the transformations.py inside the pipeline.py for defining the neccessary data transformations.
import dlt
from transformations import *
from coordinate_transformation import apply_coordinate_transform
#.....
def define_ingestion_pipeline(spark, config):
....
@dlt.table(
comment='',
path = ...
)
def table_name():
data = dlt.read_stream("other")
return transform_data(data)
...
Everything works, except where i use a python user defined function in one of the transformations. The corresponding transformation looks similar:
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
def apply_coordinate_transform(data):
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
apparently the coordinate_transformation.py is not available, but why?
Error Message: File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length return self.loads(obj) File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads return cloudpickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'coordinate_transformation'
(Unrelated:
could somebody point out to me a way how to multiply a 3x3 matrices onto a Nx3 large dataframe ? (resulting in a Nx3 dataframe)
Regards and thanks
David
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2023 01:24 AM
I solved this by defining the applyInPandas transformation inside of the function that it's used in.
def apply_coordinate_transform(data):
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2023 02:52 AM
Hi
yes, I discovered three working possibilities:
- Define the pandas functions as inline function as pointed out above
- Define the pandas function in the same script that is imported as "library" in the dlt config (
libraries: - notebook: path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.py
- install your python library as whl on the cluster
Regards
DAvid
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-22-2023 02:54 AM
Hi David, I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-04-2023 01:51 PM
Hello David. Same as James : "I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?"
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2023 01:24 AM
I solved this by defining the applyInPandas transformation inside of the function that it's used in.
def apply_coordinate_transform(data):
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2023 02:52 AM
Hi
yes, I discovered three working possibilities:
- Define the pandas functions as inline function as pointed out above
- Define the pandas function in the same script that is imported as "library" in the dlt config (
libraries: - notebook: path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.py
- install your python library as whl on the cluster
Regards
DAvid