dev4py.utils.pipeline.step_pipeline

The StepPipeline class is used in order to create a Pipeline where each step can stop the execution

  1"""The `StepPipeline` class is used in order to create a Pipeline where each step can stop the execution"""
  2
  3# Copyright 2022 the original author or authors (i.e.: St4rG00se for Dev4py).
  4#
  5# Licensed under the Apache License, Version 2.0 (the "License");
  6# you may not use this file except in compliance with the License.
  7# You may obtain a copy of the License at
  8#
  9#      https://www.apache.org/licenses/LICENSE-2.0
 10#
 11# Unless required by applicable law or agreed to in writing, software
 12# distributed under the License is distributed on an "AS IS" BASIS,
 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14# See the License for the specific language governing permissions and
 15# limitations under the License.
 16
 17from __future__ import annotations
 18
 19from dataclasses import dataclass
 20from typing import Generic, Final, Any, Optional, Union
 21
 22from dev4py.utils.joptional import JOptional
 23from dev4py.utils.objects import require_non_none, require_non_none_else
 24from dev4py.utils.types import Function, T, N, IN, OUT
 25
 26
 27@dataclass
 28class StepResult(Generic[T]):
 29    """
 30    Represents a Step result
 31
 32    Args:
 33        value: the result value of T type
 34        go_next: True if the next step must be executed, otherwise False
 35    """
 36    value: T
 37    go_next: bool = True
 38
 39
 40class _Step(Generic[IN, OUT]):
 41
 42    def __init__(self, handler: Function[IN, StepResult[OUT]]):
 43        """_Step constructor"""
 44        self._handler: Final[Function[IN, StepResult[OUT]]] = require_non_none(handler)
 45        self._next: JOptional[_Step[OUT, Any]] = JOptional.empty()
 46
 47    def add_next(self, next_handler: Function[OUT, StepResult[N]]) -> _Step[OUT, N]:
 48        """
 49        Adds a next step and returns it
 50
 51        Args:
 52            next_handler: the next step handler
 53
 54        Returns:
 55            _Step[OUT, N]: The new created step
 56
 57        """
 58        next_step: _Step[OUT, N] = _Step(handler=require_non_none(next_handler))
 59        self._next = JOptional.of(next_step)
 60        return next_step
 61
 62    def execute(self, value: IN) -> StepResult[Union[OUT, Any]]:
 63        """
 64        Executes the current step and if `go_next` is True call the next step if it exists. Returns the last executed
 65        step.
 66
 67        Note: It means if the returned StepResult `go_next` field is TRUE all steps were executed. Otherwise, one step
 68        has stopped the execution.
 69
 70        Args:
 71            value: The value of IN type
 72
 73        Returns:
 74            StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if `go_next` is False
 75        """
 76        step_result: Final[StepResult[OUT]] = self._handler(value)
 77        return self._next.get().execute(step_result.value) \
 78            if step_result.go_next and self._next.is_present() else step_result
 79
 80
 81class StepPipeline(Generic[IN, OUT]):
 82    __CREATE_KEY: Final[object] = object()
 83
 84    @classmethod
 85    def of(cls, handler: Function[IN, StepResult[OUT]]) -> StepPipeline[IN, OUT]:
 86        """
 87        Returns a StepPipeline[IN, OUT] initialized with the given handler
 88
 89        Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To
 90        stop the current pipeline at this step `go_next` field must be set to False.
 91
 92        Args:
 93            handler: the first step handler
 94
 95        Returns:
 96            StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an
 97            OUT value
 98
 99        """
100        return cls._of(step=_Step(handler))
101
102    @classmethod
103    def _of(cls, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None) -> StepPipeline[IN, OUT]:
104        """
105        !!! PRIVATE _OF CLASS METHOD !!!
106
107        Returns a StepPipeline[IN, OUT] by using the given last step and the given head step
108
109        Args:
110            step: the last step
111            head: the head step
112
113        Returns:
114            StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an
115            OUT value
116        """
117        return StepPipeline(cls.__CREATE_KEY, step=step, head=head)
118
119    def __init__(self, create_key: object, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None):
120        """
121        StepPipeline private constructor: Constructs a Pipeline which consumes an IN value and last step consumes a OUT
122        value
123        """
124        assert create_key == self.__CREATE_KEY, "StepPipeline private constructor! Please use StepPipeline.of"
125        require_non_none(step)
126        self._head: _Step[IN, Any] = require_non_none_else(head, step)
127        self._last: _Step[Any, OUT] = step
128
129    def add_handler(self, next_handler: Function[OUT, StepResult[N]]) -> StepPipeline[IN, N]:
130        """
131        Adds a new step to the pipeline by using the given handler and returns the new Pipeline
132
133        Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To
134        stop the current pipeline at this step `go_next` field must be set to False.
135
136        Args:
137            next_handler: the new step handler
138
139        Returns:
140            StepPipeline[IN, N]: The new pipeline StepPipeline which consumes an IN value and last step consumes an N
141                value
142
143        """
144        return StepPipeline._of(step=self._last.add_next(next_handler), head=self._head)
145
146    def execute(self, value: IN) -> StepResult[Union[OUT, Any]]:
147        """
148        Executes the current pipeline and returns the last executed Step
149
150        Note: The last executed step can be an intermediate one if `go_next` is false. If `go_next` is True it means
151        that the pipeline was totally executed for the given value.
152
153        Args:
154            value: The value of IN type
155
156        Returns:
157            StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if `go_next` is False
158
159        """
160        return self._head.execute(value)
@dataclass
class StepResult(typing.Generic[~T]):
28@dataclass
29class StepResult(Generic[T]):
30    """
31    Represents a Step result
32
33    Args:
34        value: the result value of T type
35        go_next: True if the next step must be executed, otherwise False
36    """
37    value: T
38    go_next: bool = True

Represents a Step result

Arguments:
  • value: the result value of T type
  • go_next: True if the next step must be executed, otherwise False
StepResult(value: ~T, go_next: bool = True)
value: ~T
go_next: bool = True
class StepPipeline(typing.Generic[~IN, ~OUT]):
 82class StepPipeline(Generic[IN, OUT]):
 83    __CREATE_KEY: Final[object] = object()
 84
 85    @classmethod
 86    def of(cls, handler: Function[IN, StepResult[OUT]]) -> StepPipeline[IN, OUT]:
 87        """
 88        Returns a StepPipeline[IN, OUT] initialized with the given handler
 89
 90        Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To
 91        stop the current pipeline at this step `go_next` field must be set to False.
 92
 93        Args:
 94            handler: the first step handler
 95
 96        Returns:
 97            StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an
 98            OUT value
 99
100        """
101        return cls._of(step=_Step(handler))
102
103    @classmethod
104    def _of(cls, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None) -> StepPipeline[IN, OUT]:
105        """
106        !!! PRIVATE _OF CLASS METHOD !!!
107
108        Returns a StepPipeline[IN, OUT] by using the given last step and the given head step
109
110        Args:
111            step: the last step
112            head: the head step
113
114        Returns:
115            StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an
116            OUT value
117        """
118        return StepPipeline(cls.__CREATE_KEY, step=step, head=head)
119
120    def __init__(self, create_key: object, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None):
121        """
122        StepPipeline private constructor: Constructs a Pipeline which consumes an IN value and last step consumes a OUT
123        value
124        """
125        assert create_key == self.__CREATE_KEY, "StepPipeline private constructor! Please use StepPipeline.of"
126        require_non_none(step)
127        self._head: _Step[IN, Any] = require_non_none_else(head, step)
128        self._last: _Step[Any, OUT] = step
129
130    def add_handler(self, next_handler: Function[OUT, StepResult[N]]) -> StepPipeline[IN, N]:
131        """
132        Adds a new step to the pipeline by using the given handler and returns the new Pipeline
133
134        Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To
135        stop the current pipeline at this step `go_next` field must be set to False.
136
137        Args:
138            next_handler: the new step handler
139
140        Returns:
141            StepPipeline[IN, N]: The new pipeline StepPipeline which consumes an IN value and last step consumes an N
142                value
143
144        """
145        return StepPipeline._of(step=self._last.add_next(next_handler), head=self._head)
146
147    def execute(self, value: IN) -> StepResult[Union[OUT, Any]]:
148        """
149        Executes the current pipeline and returns the last executed Step
150
151        Note: The last executed step can be an intermediate one if `go_next` is false. If `go_next` is True it means
152        that the pipeline was totally executed for the given value.
153
154        Args:
155            value: The value of IN type
156
157        Returns:
158            StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if `go_next` is False
159
160        """
161        return self._head.execute(value)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

StepPipeline( create_key: object, step: dev4py.utils.pipeline.step_pipeline._Step[typing.Any, ~OUT], head: Optional[dev4py.utils.pipeline.step_pipeline._Step[~IN, Any]] = None)
120    def __init__(self, create_key: object, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None):
121        """
122        StepPipeline private constructor: Constructs a Pipeline which consumes an IN value and last step consumes a OUT
123        value
124        """
125        assert create_key == self.__CREATE_KEY, "StepPipeline private constructor! Please use StepPipeline.of"
126        require_non_none(step)
127        self._head: _Step[IN, Any] = require_non_none_else(head, step)
128        self._last: _Step[Any, OUT] = step

StepPipeline private constructor: Constructs a Pipeline which consumes an IN value and last step consumes a OUT value

@classmethod
def of( cls, handler: Callable[[~IN], StepResult[~OUT]]) -> StepPipeline[~IN, ~OUT]:
 85    @classmethod
 86    def of(cls, handler: Function[IN, StepResult[OUT]]) -> StepPipeline[IN, OUT]:
 87        """
 88        Returns a StepPipeline[IN, OUT] initialized with the given handler
 89
 90        Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To
 91        stop the current pipeline at this step `go_next` field must be set to False.
 92
 93        Args:
 94            handler: the first step handler
 95
 96        Returns:
 97            StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an
 98            OUT value
 99
100        """
101        return cls._of(step=_Step(handler))

Returns a StepPipeline[IN, OUT] initialized with the given handler

Note: the handler returned value is a StepResult. The function result value must be put in value field. To stop the current pipeline at this step go_next field must be set to False.

Arguments:
  • handler: the first step handler
Returns:

StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an OUT value

def add_handler( self, next_handler: Callable[[~OUT], StepResult[~N]]) -> StepPipeline[~IN, ~N]:
130    def add_handler(self, next_handler: Function[OUT, StepResult[N]]) -> StepPipeline[IN, N]:
131        """
132        Adds a new step to the pipeline by using the given handler and returns the new Pipeline
133
134        Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To
135        stop the current pipeline at this step `go_next` field must be set to False.
136
137        Args:
138            next_handler: the new step handler
139
140        Returns:
141            StepPipeline[IN, N]: The new pipeline StepPipeline which consumes an IN value and last step consumes an N
142                value
143
144        """
145        return StepPipeline._of(step=self._last.add_next(next_handler), head=self._head)

Adds a new step to the pipeline by using the given handler and returns the new Pipeline

Note: the handler returned value is a StepResult. The function result value must be put in value field. To stop the current pipeline at this step go_next field must be set to False.

Arguments:
  • next_handler: the new step handler
Returns:

StepPipeline[IN, N]: The new pipeline StepPipeline which consumes an IN value and last step consumes an N value

def execute( self, value: ~IN) -> StepResult[typing.Union[~OUT, typing.Any]]:
147    def execute(self, value: IN) -> StepResult[Union[OUT, Any]]:
148        """
149        Executes the current pipeline and returns the last executed Step
150
151        Note: The last executed step can be an intermediate one if `go_next` is false. If `go_next` is True it means
152        that the pipeline was totally executed for the given value.
153
154        Args:
155            value: The value of IN type
156
157        Returns:
158            StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if `go_next` is False
159
160        """
161        return self._head.execute(value)

Executes the current pipeline and returns the last executed Step

Note: The last executed step can be an intermediate one if go_next is false. If go_next is True it means that the pipeline was totally executed for the given value.

Arguments:
  • value: The value of IN type
Returns:

StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if go_next is False