My new DLT pipeline gives me a ModuleNotFound error when I try to request data from an API. For some more context, I develop in my local IDE and then deploy to databricks using asset bundles. The pipeline runs fine if I try to write a static dataframe instead of making the request to the API.
Here's my pipeline code:
@Dlt.table(
name="my_table",
table_properties={"quality": "bronze"},
)
def dlt_ingestion():
api_key = os.getenv("API_KEY")
params = build_query_params()
spark.dataSource.register(MyDataSource)
response_df = (
spark.read.format("mydatasource").option("api_key", api_key).option("params", json.dumps(params)).load()
)
return response_df
class MyDataSource(DataSource):
@classmethod
def name(cls):
return "mydatasource"
def schema(self):
return my_schema
def reader(self, schema: StructType):
return MyDataSourceReader(schema, self.options)
class MyDataSourceReader(DataSourceReader):
def __init__(self, schema: StructType, options: dict):
self.schema = schema
self.options = options
self.api_key = self.options.get("api_key")
self.base_url = "https:/my/api/url"
self.timeout = int(self.options.get("timeout", 600))
json_params = self.options.get("params")
if json_params:
try:
self.params = json.loads(json_params)
except json.JSONDecodeError:
raise ValueError("Invalid JSON format for params")
else:
self.params = {}
def read(self, partition):
headers = {"Accept": "application/json", "X-Api-Key": self.api_key}
page = 1
result = []
next_page = True
self.params["pageSize"] = 1000
while next_page:
params["page"] = page
response = requests.get(self.base_url, headers=headers, params=self.params, timeout=self.timeout)
response.raise_for_status()
data = response.json()
for record in data:
result.append(record)
if len(data) == self.params["pageSize"]:
page += 1
else:
next_page = False
for item in data:
yield tuple(item.values())
def partitions(self):
from pyspark.sql.datasource import InputPartition
return [InputPartition(0)]
The error I'm getting:
pyspark.errors.exceptions.captured.AnalysisException: [PYTHON_DATA_SOURCE_ERROR] Failed to create Python data source instance: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
return self.loads(obj)
^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
return cloudpickle.loads(obj, encoding=encoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'src'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
return self.loads(obj)
^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
return cloudpickle.loads(obj, encoding=encoding)