dev4py.utils.pipeline.simple_pipeline
The SimplePipeline
class is used in order to create a Pipeline with one operation per step
1"""The `SimplePipeline` class is used in order to create a Pipeline with one operation per step""" 2 3# Copyright 2022 the original author or authors (i.e.: St4rG00se for Dev4py). 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# https://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from __future__ import annotations 18 19from functools import partial 20from typing import Generic, Final, cast 21 22from dev4py.utils.objects import require_non_none 23from dev4py.utils.types import OUT, IN, Function, R 24 25 26class SimplePipeline(Generic[IN, OUT]): 27 """A `SimplePipeline` with input of IN type and output of OUT type""" 28 29 __CREATE_KEY: Final[object] = object() 30 31 @classmethod 32 def of(cls, handler: Function[IN, OUT]) -> SimplePipeline[IN, OUT]: 33 """ 34 Returns a SimplePipeline[IN, OUT] initialized with the given handler 35 36 Args: 37 handler: The first pipeline handler 38 39 Returns: 40 SimplePipeline[IN, OUT]: The pipeline with the given handler as first operation 41 42 """ 43 return SimplePipeline(handler, cls.__CREATE_KEY) 44 45 def __init__(self, handler: Function[IN, OUT], create_key: object): 46 """SimplePipeline private constructor: Constructs a Pipeline which processes IN to OUT value""" 47 assert create_key == self.__CREATE_KEY, "SimplePipeline private constructor! Please use SimplePipeline.of" 48 self._handler: Function[IN, OUT] = require_non_none(handler) 49 50 def _add_handler_lambda(self, value: IN, handler: Function[OUT, R]) -> R: 51 """ 52 private method to describe add_handler method new handler 53 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 54 """ 55 # lambda value: handler(self._handler(value)) 56 return handler(self._handler(value)) 57 58 def add_handler(self, handler: Function[OUT, R]) -> SimplePipeline[IN, R]: 59 """ 60 Adds an operation/operation/step to the pipeline 61 62 Args: 63 handler: The handler to add 64 65 Returns: 66 SimplePipeline[IN, R]: The new pipeline where input is still of IN type but output is now of R type 67 68 """ 69 require_non_none(handler) 70 return SimplePipeline.of(cast(Function[IN, R], partial(self._add_handler_lambda, handler=handler))) 71 72 def execute(self, value: IN) -> OUT: 73 """ 74 Executes the current pipeline on the given value of IN type 75 76 Args: 77 value: The value of IN type 78 79 Returns: 80 OUT: the pipeline output of OUT type 81 """ 82 return self._handler(value)
class
SimplePipeline(typing.Generic[~IN, ~OUT]):
27class SimplePipeline(Generic[IN, OUT]): 28 """A `SimplePipeline` with input of IN type and output of OUT type""" 29 30 __CREATE_KEY: Final[object] = object() 31 32 @classmethod 33 def of(cls, handler: Function[IN, OUT]) -> SimplePipeline[IN, OUT]: 34 """ 35 Returns a SimplePipeline[IN, OUT] initialized with the given handler 36 37 Args: 38 handler: The first pipeline handler 39 40 Returns: 41 SimplePipeline[IN, OUT]: The pipeline with the given handler as first operation 42 43 """ 44 return SimplePipeline(handler, cls.__CREATE_KEY) 45 46 def __init__(self, handler: Function[IN, OUT], create_key: object): 47 """SimplePipeline private constructor: Constructs a Pipeline which processes IN to OUT value""" 48 assert create_key == self.__CREATE_KEY, "SimplePipeline private constructor! Please use SimplePipeline.of" 49 self._handler: Function[IN, OUT] = require_non_none(handler) 50 51 def _add_handler_lambda(self, value: IN, handler: Function[OUT, R]) -> R: 52 """ 53 private method to describe add_handler method new handler 54 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 55 """ 56 # lambda value: handler(self._handler(value)) 57 return handler(self._handler(value)) 58 59 def add_handler(self, handler: Function[OUT, R]) -> SimplePipeline[IN, R]: 60 """ 61 Adds an operation/operation/step to the pipeline 62 63 Args: 64 handler: The handler to add 65 66 Returns: 67 SimplePipeline[IN, R]: The new pipeline where input is still of IN type but output is now of R type 68 69 """ 70 require_non_none(handler) 71 return SimplePipeline.of(cast(Function[IN, R], partial(self._add_handler_lambda, handler=handler))) 72 73 def execute(self, value: IN) -> OUT: 74 """ 75 Executes the current pipeline on the given value of IN type 76 77 Args: 78 value: The value of IN type 79 80 Returns: 81 OUT: the pipeline output of OUT type 82 """ 83 return self._handler(value)
A SimplePipeline
with input of IN type and output of OUT type
SimplePipeline(handler: Callable[[~IN], ~OUT], create_key: object)
46 def __init__(self, handler: Function[IN, OUT], create_key: object): 47 """SimplePipeline private constructor: Constructs a Pipeline which processes IN to OUT value""" 48 assert create_key == self.__CREATE_KEY, "SimplePipeline private constructor! Please use SimplePipeline.of" 49 self._handler: Function[IN, OUT] = require_non_none(handler)
SimplePipeline private constructor: Constructs a Pipeline which processes IN to OUT value
32 @classmethod 33 def of(cls, handler: Function[IN, OUT]) -> SimplePipeline[IN, OUT]: 34 """ 35 Returns a SimplePipeline[IN, OUT] initialized with the given handler 36 37 Args: 38 handler: The first pipeline handler 39 40 Returns: 41 SimplePipeline[IN, OUT]: The pipeline with the given handler as first operation 42 43 """ 44 return SimplePipeline(handler, cls.__CREATE_KEY)
Returns a SimplePipeline[IN, OUT] initialized with the given handler
Arguments:
- handler: The first pipeline handler
Returns:
SimplePipeline[IN, OUT]: The pipeline with the given handler as first operation
59 def add_handler(self, handler: Function[OUT, R]) -> SimplePipeline[IN, R]: 60 """ 61 Adds an operation/operation/step to the pipeline 62 63 Args: 64 handler: The handler to add 65 66 Returns: 67 SimplePipeline[IN, R]: The new pipeline where input is still of IN type but output is now of R type 68 69 """ 70 require_non_none(handler) 71 return SimplePipeline.of(cast(Function[IN, R], partial(self._add_handler_lambda, handler=handler)))
Adds an operation/operation/step to the pipeline
Arguments:
- handler: The handler to add
Returns:
SimplePipeline[IN, R]: The new pipeline where input is still of IN type but output is now of R type
def
execute(self, value: ~IN) -> ~OUT:
73 def execute(self, value: IN) -> OUT: 74 """ 75 Executes the current pipeline on the given value of IN type 76 77 Args: 78 value: The value of IN type 79 80 Returns: 81 OUT: the pipeline output of OUT type 82 """ 83 return self._handler(value)
Executes the current pipeline on the given value of IN type
Arguments:
- value: The value of IN type
Returns:
OUT: the pipeline output of OUT type