dev4py.utils.pipeline.simple_pipeline

The SimplePipeline class is used in order to create a Pipeline with one operation per step

 1"""The `SimplePipeline` class is used in order to create a Pipeline with one operation per step"""
 2
 3# Copyright 2022 the original author or authors (i.e.: St4rG00se for Dev4py).
 4#
 5# Licensed under the Apache License, Version 2.0 (the "License");
 6# you may not use this file except in compliance with the License.
 7# You may obtain a copy of the License at
 8#
 9#      https://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17from __future__ import annotations
18
19from functools import partial
20from typing import Generic, Final, cast
21
22from dev4py.utils.objects import require_non_none
23from dev4py.utils.types import OUT, IN, Function, R
24
25
26class SimplePipeline(Generic[IN, OUT]):
27    """A `SimplePipeline` with input of IN type and output of OUT type"""
28
29    __CREATE_KEY: Final[object] = object()
30
31    @classmethod
32    def of(cls, handler: Function[IN, OUT]) -> SimplePipeline[IN, OUT]:
33        """
34        Returns a SimplePipeline[IN, OUT] initialized with the given handler
35
36        Args:
37            handler: The first pipeline handler
38
39        Returns:
40            SimplePipeline[IN, OUT]: The pipeline with the given handler as first operation
41
42        """
43        return SimplePipeline(handler, cls.__CREATE_KEY)
44
45    def __init__(self, handler: Function[IN, OUT], create_key: object):
46        """SimplePipeline private constructor: Constructs a Pipeline which processes IN to OUT value"""
47        assert create_key == self.__CREATE_KEY, "SimplePipeline private constructor! Please use SimplePipeline.of"
48        self._handler: Function[IN, OUT] = require_non_none(handler)
49
50    def _add_handler_lambda(self, value: IN, handler: Function[OUT, R]) -> R:
51        """
52        private method to describe add_handler method new handler
53        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
54        """
55        # lambda value: handler(self._handler(value))
56        return handler(self._handler(value))
57
58    def add_handler(self, handler: Function[OUT, R]) -> SimplePipeline[IN, R]:
59        """
60        Adds an operation/operation/step to the pipeline
61
62        Args:
63            handler: The handler to add
64
65        Returns:
66            SimplePipeline[IN, R]: The new pipeline where input is still of IN type but output is now of R type
67
68        """
69        require_non_none(handler)
70        return SimplePipeline.of(cast(Function[IN, R], partial(self._add_handler_lambda, handler=handler)))
71
72    def execute(self, value: IN) -> OUT:
73        """
74        Executes the current pipeline on the given value of IN type
75
76        Args:
77            value: The value of IN type
78
79        Returns:
80            OUT: the pipeline output of OUT type
81        """
82        return self._handler(value)
class SimplePipeline(typing.Generic[~IN, ~OUT]):
27class SimplePipeline(Generic[IN, OUT]):
28    """A `SimplePipeline` with input of IN type and output of OUT type"""
29
30    __CREATE_KEY: Final[object] = object()
31
32    @classmethod
33    def of(cls, handler: Function[IN, OUT]) -> SimplePipeline[IN, OUT]:
34        """
35        Returns a SimplePipeline[IN, OUT] initialized with the given handler
36
37        Args:
38            handler: The first pipeline handler
39
40        Returns:
41            SimplePipeline[IN, OUT]: The pipeline with the given handler as first operation
42
43        """
44        return SimplePipeline(handler, cls.__CREATE_KEY)
45
46    def __init__(self, handler: Function[IN, OUT], create_key: object):
47        """SimplePipeline private constructor: Constructs a Pipeline which processes IN to OUT value"""
48        assert create_key == self.__CREATE_KEY, "SimplePipeline private constructor! Please use SimplePipeline.of"
49        self._handler: Function[IN, OUT] = require_non_none(handler)
50
51    def _add_handler_lambda(self, value: IN, handler: Function[OUT, R]) -> R:
52        """
53        private method to describe add_handler method new handler
54        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
55        """
56        # lambda value: handler(self._handler(value))
57        return handler(self._handler(value))
58
59    def add_handler(self, handler: Function[OUT, R]) -> SimplePipeline[IN, R]:
60        """
61        Adds an operation/operation/step to the pipeline
62
63        Args:
64            handler: The handler to add
65
66        Returns:
67            SimplePipeline[IN, R]: The new pipeline where input is still of IN type but output is now of R type
68
69        """
70        require_non_none(handler)
71        return SimplePipeline.of(cast(Function[IN, R], partial(self._add_handler_lambda, handler=handler)))
72
73    def execute(self, value: IN) -> OUT:
74        """
75        Executes the current pipeline on the given value of IN type
76
77        Args:
78            value: The value of IN type
79
80        Returns:
81            OUT: the pipeline output of OUT type
82        """
83        return self._handler(value)

A SimplePipeline with input of IN type and output of OUT type

SimplePipeline(handler: Callable[[~IN], ~OUT], create_key: object)
46    def __init__(self, handler: Function[IN, OUT], create_key: object):
47        """SimplePipeline private constructor: Constructs a Pipeline which processes IN to OUT value"""
48        assert create_key == self.__CREATE_KEY, "SimplePipeline private constructor! Please use SimplePipeline.of"
49        self._handler: Function[IN, OUT] = require_non_none(handler)

SimplePipeline private constructor: Constructs a Pipeline which processes IN to OUT value

@classmethod
def of( cls, handler: Callable[[~IN], ~OUT]) -> SimplePipeline[~IN, ~OUT]:
32    @classmethod
33    def of(cls, handler: Function[IN, OUT]) -> SimplePipeline[IN, OUT]:
34        """
35        Returns a SimplePipeline[IN, OUT] initialized with the given handler
36
37        Args:
38            handler: The first pipeline handler
39
40        Returns:
41            SimplePipeline[IN, OUT]: The pipeline with the given handler as first operation
42
43        """
44        return SimplePipeline(handler, cls.__CREATE_KEY)

Returns a SimplePipeline[IN, OUT] initialized with the given handler

Arguments:
  • handler: The first pipeline handler
Returns:

SimplePipeline[IN, OUT]: The pipeline with the given handler as first operation

def add_handler( self, handler: Callable[[~OUT], ~R]) -> SimplePipeline[~IN, ~R]:
59    def add_handler(self, handler: Function[OUT, R]) -> SimplePipeline[IN, R]:
60        """
61        Adds an operation/operation/step to the pipeline
62
63        Args:
64            handler: The handler to add
65
66        Returns:
67            SimplePipeline[IN, R]: The new pipeline where input is still of IN type but output is now of R type
68
69        """
70        require_non_none(handler)
71        return SimplePipeline.of(cast(Function[IN, R], partial(self._add_handler_lambda, handler=handler)))

Adds an operation/operation/step to the pipeline

Arguments:
  • handler: The handler to add
Returns:

SimplePipeline[IN, R]: The new pipeline where input is still of IN type but output is now of R type

def execute(self, value: ~IN) -> ~OUT:
73    def execute(self, value: IN) -> OUT:
74        """
75        Executes the current pipeline on the given value of IN type
76
77        Args:
78            value: The value of IN type
79
80        Returns:
81            OUT: the pipeline output of OUT type
82        """
83        return self._handler(value)

Executes the current pipeline on the given value of IN type

Arguments:
  • value: The value of IN type
Returns:

OUT: the pipeline output of OUT type