I should start by saying that everything works fine if I copy and paste it all into a notebook and run it. The problem starts if we try to have any structure in our application repository. Also, so far we have only run into this problem with applyInPandasWithState. Most other stuff works just fine.
For context, our spark DLT pipelines are generally broken into application code and an SDK library called `data_lake`
when running the following code I keep getting a module not found error.
data_lake/actions.py
class ArbitraryStatefulOperator(ABC):
"""Base class for performing Arbitrary stateful operations"""
def __init__(
self,
output_schema: Union[StructType, str],
input_schema: Union[StructType, str],
group_ids: List[Union[Column, str]],
output_mode: str,
timeout_type: _TimeoutType,
):
self.output_schema = output_schema
self.input_schema = input_schema
self.group_ids = group_ids
self.output_mode = output_mode
self.timeout_type = timeout_type
self._messages: List[Dict[str, Any]] = []
@property
def messages(self) -> List[Dict[str, Any]]:
return self._messages
@staticmethod
@abstractmethod
def parse_row(row: Series[Any]) -> Dict[str, Any]:
"""Parse a single row for of a pandas dataframe into a dictionary"""
@staticmethod
@abstractmethod
def process_timeout(old_state: Tuple[Any, ...]) -> Tuple[Any, ...]:
"""Generate a new state when a timeout happens"""
@staticmethod
@abstractmethod
def process_event(
old_state: Tuple[Any, ...],
messages: List[Dict[str, Any]],
) -> Tuple[Any, ...]:
"""Generate a new state when messages are recieved."""
@staticmethod
@abstractmethod
def init_state(messages: List[Dict[str, Any]]) -> Tuple[Any, ...]:
"""Initialise a new state when a new key is detected."""
@staticmethod
@abstractmethod
def convert_to_df(
key: Tuple[Any],
new_state: Tuple[Any, ...],
messages: List[Dict[str, Any]],
) -> List[PandasDataFrame]:
"""Create a list of pandas dataframes to output"""
@staticmethod
@abstractmethod
def parse_state(spark_state: Tuple[Any, ...]) -> Tuple[Any, ...]:
"""Parse the state returned by spark"""
@abstractmethod
def set_timeout(
self,
set_timestamp_func: Callable[[int], None],
set_duration_func: Callable[[int], None],
) -> None:
"""Set the timeout for the group."""
def _update_state(
self,
key: Tuple[str],
pandas_dfs: Iterable[PandasDataFrame],
group_state: GroupState,
):
# parse the messages
messages: List[Dict[str, Any]] = []
for pandas_df in pandas_dfs:
for _, row in pandas_df.iterrows(): # type:ignore
messages.append(self.parse_row(row)) # type:ignore
# Iniialize the state
if group_state.exists:
old_state = self.parse_state(group_state.get) # type:ignore
else:
old_state = self.init_state(messages)
# update the state
if group_state.hasTimedOut:
new_state = self.process_timeout(old_state)
else:
new_state = self.process_event(old_state, messages)
group_state.update(new_state) # type:ignore
# set the next timeout
self.set_timeout(
group_state.setTimeoutTimestamp,
group_state.setTimeoutDuration,
)
return self.convert_to_df(key, new_state, messages)
def compute_state(self, dataframe: DataFrame) -> DataFrame:
"""Get a datafreame containing the state of each group."""
return dataframe.groupBy(*(self.group_ids)).applyInPandasWithState(
self._update_state,
self.output_schema,
self.input_schema,
self.output_mode,
self.timeout_type,
)
Lets say the application repo is laid out like this:
repo_root/
├─ app_notebook
├─ application/
│ ├─ state_machine.py
with the following code
application/state_machine.py
class TestStateCalcualtor(ArbitraryStatefulOperator):
def __init__(self):
output_schema = "key STRING, value STRING"
input_schema = "value STRING"
group_ids: List[Union[Column, str]] = ["key"]
output_mode = "append"
timeout_type="ProcessingTimeTimeout",
super().__init__(output_schema, input_schema, group_ids, output_mode, timeout_type)
@staticmethod
def parse_row(row: Series[Any]) -> Dict[str, Any]:
"""Parse a single row for of a pandas dataframe into a dictionary"""
return {
"key": row.key,
"value": row.value,
}
@staticmethod
def process_timeout(old_state: Tuple[Any, ...]) -> Any:
"""Generate a new state when a timeout happens"""
return ("",)
@staticmethod
def process_event(
old_state: Tuple[Any, ...],
messages: List[Dict[str, Any]],
) -> Any:
"""Generate a new state when messages are recieved."""
return ("Message received",)
@staticmethod
def init_state(messages: List[Dict[str, Any]]) -> Tuple[Any, ...]:
"""Initialise a new state when a new key is detected."""
return ("New key",)
@staticmethod
def convert_to_df(
key: Tuple[str],
new_state: Tuple[Any, ...],
messages: List[Dict[str, Any]],
) -> List[PandasDataFrame]:
"""Create a list of pandas dataframes to output"""
(id,) = key
return [
PandasDataFrame(
{
"key": [id],
"value": [new_state[0]],
}
)
]
@staticmethod
def parse_state(spark_state: Tuple[Any, ...]) -> Tuple[Any, ...]:
"""Parse the state returned by spark"""
return spark_state
def set_timeout(
self,
set_timestamp_func: Callable[[int], None],
set_duration_func: Callable[[int], None],
) -> None:
set_duration_func(15*60*1000)
app_notebook
sys.path.append(os.path.abspath("/Workspace/Repos/.../path/to/data_lake/lib"))
import dlt
from application.state_machine import TestStateCalcualtor
@dlt.table()
def test_table():
return TestStateCalcualtor().compute_state(dlt.read_stream("bronze_messages"))
We we keep getting the following error:
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 1daa2f5a-a7d0-4030-9673-e1fe02e11a76, runId = b6518b0b-fb61-4eb2-ad58-df868b29a205] terminated with exception: Exception thrown in awaitResult: Job aborted due to stage failure: Task 5 in stage 2611.0 failed 4 times, most recent failure: Lost task 5.3 in stage 2611.0 (TID 20310) (10.175.145.115 executor 2): org.apache.spark.api.python.PythonException: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length
return self.loads(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads
return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'data_lake''. Full traceback below:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length
return self.loads(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads
return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'data_lake'
During handling of the above exception, another exception occurred:
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length
return self.loads(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads
return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'data_lake'
Just to reiterate, if I paste the contents of state_machine.py and actions.py into the app_notebook we have no problem. the problem is if I have to import them.