cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Python DataSource API utilities/ Import Fails in Spark Declarative Pipeline

smpa01
Contributor
TLDR - UDFs work fine when imported from `utilities/` folder in DLT pipelines, but custom Python DataSource APIs fail with ModuleNotFoundError: No module named 'utilities'` during serialization. Only inline definitions work. Need reusable DataSource classes across multiple transformations.
 
Context - Databricks auto-generated SDP works perfectly with utilities folder pattern:
 
Working Structure:
```
SDP_Project/
├── transformations/
│   └── pipeline.py
└── utilities/
    └── utils.py
```
Working Code:
```python
# transformations/pipeline.py
from utilities import utils
@DP.table
def my_table():
    return df.withColumn("valid", utils.is_valid_email(col("email")))
# utilities/utils.py
from pyspark.sql.functions import udf
@udf(returnType=BooleanType())
def is_valid_email(email):
    return re.match(pattern, email) is not None
```
 
Problem -Same pattern fails with python-data-source-api
My Structure:
```
Test_DLT/
├── transformations/
│   └── dlt.py  
└── utilities/
    └── utils.py
```
My Code:
```python
# utilities/utils.py
class SomeCustomReader(DataSourceReader):
    def read(self, partition):
        # custom logic here
        yield Row(**data)
 
class SomeCustomSource(DataSource):
    @classmethod
    def name(cls):
        return "some_source"
   
    def reader(self, schema):
        return SomeCustomReader(self.options)
 
# transformations/dlt.py
from utilities.utils import SomeCustomSource
 
spark = SparkSession.builder.getOrCreate()
spark.dataSource.register(SomeCustomSource)
 
 
@DP.table
def read_data():
    df = spark.read.format("some_source").option(...).load()
    return df
```
Error
```
ModuleNotFoundError: No module named 'utilities'
pyspark.serializers.SerializationError: Caused by cloudpickle.loads(obj, encoding=encoding)
```
What Works
Only inline class definitions work (defining DataSource classes directly in dlt.py).
Questions
1. Am I doing anything wrong?
2. Is there a way to make custom DataSource APIs work with the utilities folder pattern?
Benefit
I want to avoid writing it inline for n transformations n times if possible - need reusable DataSource definitions.
0 REPLIES 0