cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Module not found when using applyInPandasWithState in Repos

umarkhan
New Contributor II

I should start by saying that everything works fine if I copy and paste it all into a notebook and run it. The problem starts if we try to have any structure in our application repository. Also, so far we have only run into this problem with applyInPandasWithState. Most other stuff works just fine.

For context, our spark DLT pipelines are generally broken into application code and an SDK library called `data_lake`

when running the following code I keep getting a module not found error.

data_lake/actions.py

 

 

class ArbitraryStatefulOperator(ABC):
    """Base class for performing Arbitrary stateful operations"""

    def __init__(
        self,
        output_schema: Union[StructType, str],
        input_schema: Union[StructType, str],
        group_ids: List[Union[Column, str]],
        output_mode: str,
        timeout_type: _TimeoutType,
    ):
        self.output_schema = output_schema
        self.input_schema = input_schema
        self.group_ids = group_ids
        self.output_mode = output_mode
        self.timeout_type = timeout_type
        self._messages: List[Dict[str, Any]] = []

    @property
    def messages(self) -> List[Dict[str, Any]]:
        return self._messages

    @staticmethod
    @abstractmethod
    def parse_row(row: Series[Any]) -> Dict[str, Any]:
        """Parse a single row for of a pandas dataframe into a dictionary"""

    @staticmethod
    @abstractmethod
    def process_timeout(old_state: Tuple[Any, ...]) -> Tuple[Any, ...]:
        """Generate a new state when a timeout happens"""

    @staticmethod
    @abstractmethod
    def process_event(
        old_state: Tuple[Any, ...],
        messages: List[Dict[str, Any]],
    ) -> Tuple[Any, ...]:
        """Generate a new state when messages are recieved."""

    @staticmethod
    @abstractmethod
    def init_state(messages: List[Dict[str, Any]]) -> Tuple[Any, ...]:
        """Initialise a new state when a new key is detected."""

    @staticmethod
    @abstractmethod
    def convert_to_df(
        key: Tuple[Any],
        new_state: Tuple[Any, ...],
        messages: List[Dict[str, Any]],
    ) -> List[PandasDataFrame]:
        """Create a list of pandas dataframes to output"""

    @staticmethod
    @abstractmethod
    def parse_state(spark_state: Tuple[Any, ...]) -> Tuple[Any, ...]:
        """Parse the state returned by spark"""

    @abstractmethod
    def set_timeout(
        self,
        set_timestamp_func: Callable[[int], None],
        set_duration_func: Callable[[int], None],
    ) -> None:
        """Set the timeout for the group."""

    def _update_state(
        self,
        key: Tuple[str],
        pandas_dfs: Iterable[PandasDataFrame],
        group_state: GroupState,
    ):
        # parse the messages
        messages: List[Dict[str, Any]] = []
        for pandas_df in pandas_dfs:
            for _, row in pandas_df.iterrows():  # type:ignore
                messages.append(self.parse_row(row))  # type:ignore

        # Iniialize the state
        if group_state.exists:
            old_state = self.parse_state(group_state.get)  # type:ignore
        else:
            old_state = self.init_state(messages)

        # update the state
        if group_state.hasTimedOut:
            new_state = self.process_timeout(old_state)
        else:
            new_state = self.process_event(old_state, messages)
        group_state.update(new_state)  # type:ignore

        # set the next timeout
        self.set_timeout(
            group_state.setTimeoutTimestamp,
            group_state.setTimeoutDuration,
        )

        return self.convert_to_df(key, new_state, messages)

    def compute_state(self, dataframe: DataFrame) -> DataFrame:
        """Get a datafreame containing the state of each group."""
        return dataframe.groupBy(*(self.group_ids)).applyInPandasWithState(
            self._update_state,
            self.output_schema,
            self.input_schema,
            self.output_mode,
            self.timeout_type,
        )

 

 

 

 

Lets say the application repo is laid out like this:

repo_root/
├─ app_notebook
├─ application/
│ ├─ state_machine.py

with the following code

application/state_machine.py

 

 

class TestStateCalcualtor(ArbitraryStatefulOperator):
    def __init__(self):
        output_schema = "key STRING, value STRING"
        input_schema = "value STRING"
        group_ids: List[Union[Column, str]] = ["key"]
        output_mode = "append"
        timeout_type="ProcessingTimeTimeout",
        super().__init__(output_schema, input_schema, group_ids, output_mode, timeout_type)

    @staticmethod
    def parse_row(row: Series[Any]) -> Dict[str, Any]:
        """Parse a single row for of a pandas dataframe into a dictionary"""
        return {
            "key": row.key,
            "value": row.value,
        }

    @staticmethod
    def process_timeout(old_state: Tuple[Any, ...]) -> Any:
        """Generate a new state when a timeout happens"""
        return ("",)

    @staticmethod
    def process_event(
        old_state: Tuple[Any, ...],
        messages: List[Dict[str, Any]],
    ) -> Any:
        """Generate a new state when messages are recieved."""
        return ("Message received",)

    @staticmethod
    def init_state(messages: List[Dict[str, Any]]) -> Tuple[Any, ...]:
        """Initialise a new state when a new key is detected."""
        return ("New key",)

    @staticmethod
    def convert_to_df(
        key: Tuple[str],
        new_state: Tuple[Any, ...],
        messages: List[Dict[str, Any]],
    ) -> List[PandasDataFrame]:
        """Create a list of pandas dataframes to output"""
        (id,) = key
        return [
            PandasDataFrame(
                {
                    "key": [id],
                    "value": [new_state[0]],
                }
            )
        ]

    @staticmethod
    def parse_state(spark_state: Tuple[Any, ...]) -> Tuple[Any, ...]:
        """Parse the state returned by spark"""
        return spark_state

    def set_timeout(
        self,
        set_timestamp_func: Callable[[int], None],
        set_duration_func: Callable[[int], None],
    ) -> None:
        set_duration_func(15*60*1000)

 

 

 

app_notebook

 

 

sys.path.append(os.path.abspath("/Workspace/Repos/.../path/to/data_lake/lib"))

import dlt

from application.state_machine import TestStateCalcualtor

@dlt.table()
def test_table():
    return TestStateCalcualtor().compute_state(dlt.read_stream("bronze_messages"))

 

 

 

We we keep getting the following error:

 

 

 

org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 1daa2f5a-a7d0-4030-9673-e1fe02e11a76, runId = b6518b0b-fb61-4eb2-ad58-df868b29a205] terminated with exception: Exception thrown in awaitResult: Job aborted due to stage failure: Task 5 in stage 2611.0 failed 4 times, most recent failure: Lost task 5.3 in stage 2611.0 (TID 20310) (10.175.145.115 executor 2): org.apache.spark.api.python.PythonException: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads
    return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'data_lake''. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads
    return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'data_lake'

During handling of the above exception, another exception occurred:

pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 188, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 540, in loads
    return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'data_lake'

 

 

 

 Just to reiterate, if I paste the contents of state_machine.py and actions.py into the app_notebook we have no problem. the problem is if I have to import them.

1 REPLY 1

jose_gonzalez
Moderator
Moderator

which DBR version are you using? does it works on non DLT jobs?

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!