dev4py.utils.pipeline.step_pipeline
The StepPipeline
class is used in order to create a Pipeline where each step can stop the execution
1"""The `StepPipeline` class is used in order to create a Pipeline where each step can stop the execution""" 2 3# Copyright 2022 the original author or authors (i.e.: St4rG00se for Dev4py). 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# https://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from __future__ import annotations 18 19from dataclasses import dataclass 20from typing import Generic, Final, Any, Optional, Union 21 22from dev4py.utils.joptional import JOptional 23from dev4py.utils.objects import require_non_none, require_non_none_else 24from dev4py.utils.types import Function, T, N, IN, OUT 25 26 27@dataclass 28class StepResult(Generic[T]): 29 """ 30 Represents a Step result 31 32 Args: 33 value: the result value of T type 34 go_next: True if the next step must be executed, otherwise False 35 """ 36 value: T 37 go_next: bool = True 38 39 40class _Step(Generic[IN, OUT]): 41 42 def __init__(self, handler: Function[IN, StepResult[OUT]]): 43 """_Step constructor""" 44 self._handler: Final[Function[IN, StepResult[OUT]]] = require_non_none(handler) 45 self._next: JOptional[_Step[OUT, Any]] = JOptional.empty() 46 47 def add_next(self, next_handler: Function[OUT, StepResult[N]]) -> _Step[OUT, N]: 48 """ 49 Adds a next step and returns it 50 51 Args: 52 next_handler: the next step handler 53 54 Returns: 55 _Step[OUT, N]: The new created step 56 57 """ 58 next_step: _Step[OUT, N] = _Step(handler=require_non_none(next_handler)) 59 self._next = JOptional.of(next_step) 60 return next_step 61 62 def execute(self, value: IN) -> StepResult[Union[OUT, Any]]: 63 """ 64 Executes the current step and if `go_next` is True call the next step if it exists. Returns the last executed 65 step. 66 67 Note: It means if the returned StepResult `go_next` field is TRUE all steps were executed. Otherwise, one step 68 has stopped the execution. 69 70 Args: 71 value: The value of IN type 72 73 Returns: 74 StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if `go_next` is False 75 """ 76 step_result: Final[StepResult[OUT]] = self._handler(value) 77 return self._next.get().execute(step_result.value) \ 78 if step_result.go_next and self._next.is_present() else step_result 79 80 81class StepPipeline(Generic[IN, OUT]): 82 __CREATE_KEY: Final[object] = object() 83 84 @classmethod 85 def of(cls, handler: Function[IN, StepResult[OUT]]) -> StepPipeline[IN, OUT]: 86 """ 87 Returns a StepPipeline[IN, OUT] initialized with the given handler 88 89 Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To 90 stop the current pipeline at this step `go_next` field must be set to False. 91 92 Args: 93 handler: the first step handler 94 95 Returns: 96 StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an 97 OUT value 98 99 """ 100 return cls._of(step=_Step(handler)) 101 102 @classmethod 103 def _of(cls, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None) -> StepPipeline[IN, OUT]: 104 """ 105 !!! PRIVATE _OF CLASS METHOD !!! 106 107 Returns a StepPipeline[IN, OUT] by using the given last step and the given head step 108 109 Args: 110 step: the last step 111 head: the head step 112 113 Returns: 114 StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an 115 OUT value 116 """ 117 return StepPipeline(cls.__CREATE_KEY, step=step, head=head) 118 119 def __init__(self, create_key: object, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None): 120 """ 121 StepPipeline private constructor: Constructs a Pipeline which consumes an IN value and last step consumes a OUT 122 value 123 """ 124 assert create_key == self.__CREATE_KEY, "StepPipeline private constructor! Please use StepPipeline.of" 125 require_non_none(step) 126 self._head: _Step[IN, Any] = require_non_none_else(head, step) 127 self._last: _Step[Any, OUT] = step 128 129 def add_handler(self, next_handler: Function[OUT, StepResult[N]]) -> StepPipeline[IN, N]: 130 """ 131 Adds a new step to the pipeline by using the given handler and returns the new Pipeline 132 133 Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To 134 stop the current pipeline at this step `go_next` field must be set to False. 135 136 Args: 137 next_handler: the new step handler 138 139 Returns: 140 StepPipeline[IN, N]: The new pipeline StepPipeline which consumes an IN value and last step consumes an N 141 value 142 143 """ 144 return StepPipeline._of(step=self._last.add_next(next_handler), head=self._head) 145 146 def execute(self, value: IN) -> StepResult[Union[OUT, Any]]: 147 """ 148 Executes the current pipeline and returns the last executed Step 149 150 Note: The last executed step can be an intermediate one if `go_next` is false. If `go_next` is True it means 151 that the pipeline was totally executed for the given value. 152 153 Args: 154 value: The value of IN type 155 156 Returns: 157 StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if `go_next` is False 158 159 """ 160 return self._head.execute(value)
28@dataclass 29class StepResult(Generic[T]): 30 """ 31 Represents a Step result 32 33 Args: 34 value: the result value of T type 35 go_next: True if the next step must be executed, otherwise False 36 """ 37 value: T 38 go_next: bool = True
Represents a Step result
Arguments:
- value: the result value of T type
- go_next: True if the next step must be executed, otherwise False
82class StepPipeline(Generic[IN, OUT]): 83 __CREATE_KEY: Final[object] = object() 84 85 @classmethod 86 def of(cls, handler: Function[IN, StepResult[OUT]]) -> StepPipeline[IN, OUT]: 87 """ 88 Returns a StepPipeline[IN, OUT] initialized with the given handler 89 90 Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To 91 stop the current pipeline at this step `go_next` field must be set to False. 92 93 Args: 94 handler: the first step handler 95 96 Returns: 97 StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an 98 OUT value 99 100 """ 101 return cls._of(step=_Step(handler)) 102 103 @classmethod 104 def _of(cls, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None) -> StepPipeline[IN, OUT]: 105 """ 106 !!! PRIVATE _OF CLASS METHOD !!! 107 108 Returns a StepPipeline[IN, OUT] by using the given last step and the given head step 109 110 Args: 111 step: the last step 112 head: the head step 113 114 Returns: 115 StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an 116 OUT value 117 """ 118 return StepPipeline(cls.__CREATE_KEY, step=step, head=head) 119 120 def __init__(self, create_key: object, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None): 121 """ 122 StepPipeline private constructor: Constructs a Pipeline which consumes an IN value and last step consumes a OUT 123 value 124 """ 125 assert create_key == self.__CREATE_KEY, "StepPipeline private constructor! Please use StepPipeline.of" 126 require_non_none(step) 127 self._head: _Step[IN, Any] = require_non_none_else(head, step) 128 self._last: _Step[Any, OUT] = step 129 130 def add_handler(self, next_handler: Function[OUT, StepResult[N]]) -> StepPipeline[IN, N]: 131 """ 132 Adds a new step to the pipeline by using the given handler and returns the new Pipeline 133 134 Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To 135 stop the current pipeline at this step `go_next` field must be set to False. 136 137 Args: 138 next_handler: the new step handler 139 140 Returns: 141 StepPipeline[IN, N]: The new pipeline StepPipeline which consumes an IN value and last step consumes an N 142 value 143 144 """ 145 return StepPipeline._of(step=self._last.add_next(next_handler), head=self._head) 146 147 def execute(self, value: IN) -> StepResult[Union[OUT, Any]]: 148 """ 149 Executes the current pipeline and returns the last executed Step 150 151 Note: The last executed step can be an intermediate one if `go_next` is false. If `go_next` is True it means 152 that the pipeline was totally executed for the given value. 153 154 Args: 155 value: The value of IN type 156 157 Returns: 158 StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if `go_next` is False 159 160 """ 161 return self._head.execute(value)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
120 def __init__(self, create_key: object, step: _Step[Any, OUT], head: Optional[_Step[IN, Any]] = None): 121 """ 122 StepPipeline private constructor: Constructs a Pipeline which consumes an IN value and last step consumes a OUT 123 value 124 """ 125 assert create_key == self.__CREATE_KEY, "StepPipeline private constructor! Please use StepPipeline.of" 126 require_non_none(step) 127 self._head: _Step[IN, Any] = require_non_none_else(head, step) 128 self._last: _Step[Any, OUT] = step
StepPipeline private constructor: Constructs a Pipeline which consumes an IN value and last step consumes a OUT value
85 @classmethod 86 def of(cls, handler: Function[IN, StepResult[OUT]]) -> StepPipeline[IN, OUT]: 87 """ 88 Returns a StepPipeline[IN, OUT] initialized with the given handler 89 90 Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To 91 stop the current pipeline at this step `go_next` field must be set to False. 92 93 Args: 94 handler: the first step handler 95 96 Returns: 97 StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an 98 OUT value 99 100 """ 101 return cls._of(step=_Step(handler))
Returns a StepPipeline[IN, OUT] initialized with the given handler
Note: the handler returned value is a StepResult
. The function result value must be put in value
field. To
stop the current pipeline at this step go_next
field must be set to False.
Arguments:
- handler: the first step handler
Returns:
StepPipeline[IN, OUT]: The new pipeline StepPipeline which consumes an IN value and last step consumes an OUT value
130 def add_handler(self, next_handler: Function[OUT, StepResult[N]]) -> StepPipeline[IN, N]: 131 """ 132 Adds a new step to the pipeline by using the given handler and returns the new Pipeline 133 134 Note: the handler returned value is a `StepResult`. The function result value must be put in `value` field. To 135 stop the current pipeline at this step `go_next` field must be set to False. 136 137 Args: 138 next_handler: the new step handler 139 140 Returns: 141 StepPipeline[IN, N]: The new pipeline StepPipeline which consumes an IN value and last step consumes an N 142 value 143 144 """ 145 return StepPipeline._of(step=self._last.add_next(next_handler), head=self._head)
Adds a new step to the pipeline by using the given handler and returns the new Pipeline
Note: the handler returned value is a StepResult
. The function result value must be put in value
field. To
stop the current pipeline at this step go_next
field must be set to False.
Arguments:
- next_handler: the new step handler
Returns:
StepPipeline[IN, N]: The new pipeline StepPipeline which consumes an IN value and last step consumes an N value
147 def execute(self, value: IN) -> StepResult[Union[OUT, Any]]: 148 """ 149 Executes the current pipeline and returns the last executed Step 150 151 Note: The last executed step can be an intermediate one if `go_next` is false. If `go_next` is True it means 152 that the pipeline was totally executed for the given value. 153 154 Args: 155 value: The value of IN type 156 157 Returns: 158 StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if `go_next` is False 159 160 """ 161 return self._head.execute(value)
Executes the current pipeline and returns the last executed Step
Note: The last executed step can be an intermediate one if go_next
is false. If go_next
is True it means
that the pipeline was totally executed for the given value.
Arguments:
- value: The value of IN type
Returns:
StepResult[Union[OUT, Any]]: The last executed StepResult. Can be an intermediate step if
go_next
is False