ā07-05-2023 04:50 AM - edited ā07-05-2023 07:50 AM
Hi
I have the problem that my "module" is not known when used in a user defined function. The precise message is posted below. I have a repo structure as follows:
analytics_pipelines
ā āāā __init__.py
ā āāā coordinate_transformation.py
ā āāā data_quality_checks.py
ā āāā pipeline.py
ā āāā transformations.py
āāā delta_live_tables
ā āāā configurations
ā ā āāā data_ingestion.json
ā āāā data_ingestion.py
āāā dist
āāā notebooks
ā āāā local_example.ipynb
ā āāā testdata
ā ā āāā configuration.csv
ā ā āāā input.avro
ā ā āāā test.parquet
āāā poetry.lock
āāā pyproject.toml
āāā README.md
āāā tests
ā āāā __init__.py
ā āāā test_transformations.py
in delta_live_tables folder i got a notebook that is doing something like
import sys
sys.path.append('/Workspace/Repos/<user>/analytics-data-pipelines/analytics_pipelines')
import pipeline
config = pipeline.setup_config(mode, avro_raw_data)
pipeline.define_ingestion_pipeline(spark, config)
pipeline.define_summary_tables(spark, config)
in pipeline.define_ingestion_pipeline i define a bunch of delta live tables via the python api. I also import the transformations.py inside the pipeline.py for defining the neccessary data transformations.
import dlt
from transformations import *
from coordinate_transformation import apply_coordinate_transform
#.....
def define_ingestion_pipeline(spark, config):
....
@dlt.table(
comment='',
path = ...
)
def table_name():
data = dlt.read_stream("other")
return transform_data(data)
...
Everything works, except where i use a python user defined function in one of the transformations. The corresponding transformation looks similar:
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
def apply_coordinate_transform(data):
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
apparently the coordinate_transformation.py is not available, but why?
Error Message: File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length return self.loads(obj) File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads return cloudpickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'coordinate_transformation'
(Unrelated:
could somebody point out to me a way how to multiply a 3x3 matrices onto a Nx3 large dataframe ? (resulting in a Nx3 dataframe)
Regards and thanks
David
ā12-05-2023 01:24 AM
I solved this by defining the applyInPandas transformation inside of the function that it's used in.
def apply_coordinate_transform(data):
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
ā12-05-2023 02:52 AM
Hi
yes, I discovered three working possibilities:
libraries: - notebook: path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.pyā
Regards
DAvid
ā11-22-2023 02:54 AM
Hi David, I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?
ā12-04-2023 01:51 PM
Hello David. Same as James : "I am having the same issue for "ModuleNotFoundError: No module named ..." when using applyInPandas. Did you ever resolve this?"
ā12-05-2023 01:24 AM
I solved this by defining the applyInPandas transformation inside of the function that it's used in.
def apply_coordinate_transform(data):
def coordinate_transform(group_keys, pdf) -> pd.DataFrame:
trafo = get_coordinate_transformation(group_keys[0])
... do some pandas code here
return pdf
...
schema = data.schema
data = data.groupBy('serialnumber',...)\
.applyInPandas(coordinate_transform, schema=schema)
return data
ā12-05-2023 02:52 AM
Hi
yes, I discovered three working possibilities:
libraries: - notebook: path: ./pipelines/your_dlt_declaration_containing_high_level_udfs.pyā
Regards
DAvid
Passionate about hosting events and connecting people? Help us grow a vibrant local communityāsign up today to get started!
Sign Up Now