TLDR - UDFs work fine when imported from `utilities/` folder in DLT pipelines, but custom Python DataSource APIs fail with ModuleNotFoundError: No module named 'utilities'` during serialization. Only inline definitions work. Need reusable DataSource classes across multiple transformations.
Context - Databricks auto-generated SDP works perfectly with utilities folder pattern:
Working Structure:
```
SDP_Project/
├── transformations/
│ └── pipeline.py
└── utilities/
└── utils.py
```
Working Code:
```python
# transformations/pipeline.py
from utilities import utils
def my_table():
return df.withColumn("valid", utils.is_valid_email(col("email")))
# utilities/utils.py
from pyspark.sql.functions import udf
@udf(returnType=BooleanType())
def is_valid_email(email):
return re.match(pattern, email) is not None
```
Problem -Same pattern fails with python-data-source-api
My Structure:
```
Test_DLT/
├── transformations/
│ └── dlt.py
└── utilities/
└── utils.py
```
My Code:
```python
# utilities/utils.py
class SomeCustomReader(DataSourceReader):
def read(self, partition):
# custom logic here
yield Row(**data)
class SomeCustomSource(DataSource):
@classmethod
def name(cls):
return "some_source"
def reader(self, schema):
return SomeCustomReader(self.options)
# transformations/dlt.py
from utilities.utils import SomeCustomSource
spark = SparkSession.builder.getOrCreate()
spark.dataSource.register(SomeCustomSource)
def read_data():
df = spark.read.format("some_source").option(...).load()
return df
```
Error
```
ModuleNotFoundError: No module named 'utilities'
pyspark.serializers.SerializationError: Caused by cloudpickle.loads(obj, encoding=encoding)
```
What Works
Only inline class definitions work (defining DataSource classes directly in dlt.py).
Questions
1. Am I doing anything wrong?
2. Is there a way to make custom DataSource APIs work with the utilities folder pattern?
Benefit
I want to avoid writing it inline for n transformations n times if possible - need reusable DataSource definitions.