dev4py.utils.stream

Stream module inspired by java.util.stream

   1"""Stream module inspired by `java.util.stream`"""  # pylint: disable=C0302
   2
   3# Copyright 2022 the original author or authors (i.e.: St4rG00se for Dev4py).
   4#
   5# Licensed under the Apache License, Version 2.0 (the "License");
   6# you may not use this file except in compliance with the License.
   7# You may obtain a copy of the License at
   8#
   9#      https://www.apache.org/licenses/LICENSE-2.0
  10#
  11# Unless required by applicable law or agreed to in writing, software
  12# distributed under the License is distributed on an "AS IS" BASIS,
  13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14# See the License for the specific language governing permissions and
  15# limitations under the License.
  16
  17from __future__ import annotations
  18
  19from concurrent.futures import Executor, wait, FIRST_COMPLETED, Future, as_completed
  20from dataclasses import dataclass
  21from functools import partial, cmp_to_key
  22from typing import Generic, Final, Optional, cast, Any, Collection, Iterable, Iterator, Self
  23
  24from dev4py.utils import collectors
  25from dev4py.utils.collectors import Collector
  26from dev4py.utils.iterables import get_chunks
  27from dev4py.utils.joptional import JOptional
  28from dev4py.utils.objects import require_non_none, require_non_none_else_get, to_self
  29from dev4py.utils.pipeline import StepPipeline, StepResult
  30from dev4py.utils.types import T, Function, R, V, Predicate, Supplier, BiConsumer, K, Consumer, BiFunction
  31
  32
  33##############################
  34#  PRIVATE MODULE FUNCTIONS  #
  35##############################
  36def _default_handler(v: V) -> StepResult[R]:
  37    """
  38    private function to describe default pipeline handler
  39    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
  40    """
  41    # lambda v: StepResult(cast(R, v))
  42    return StepResult(cast(R, v))
  43
  44
  45def _root_pipeline() -> StepPipeline[V, R]:
  46    """
  47    private function to describe default pipeline
  48    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
  49    """
  50    return StepPipeline.of(_default_handler)
  51
  52
  53def _none_collector() -> Collector[T, R]:
  54    """
  55    private function to cast none_collector
  56    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
  57    """
  58    return cast(Collector[T, R], collectors.to_none())
  59
  60
  61def _map_lambda(v: T, mapper: Function[T, R]) -> StepResult[R]:
  62    """
  63    private function to describe map method handler
  64    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
  65    """
  66    # lambda v: StepResult(value=mapper(v))
  67    return StepResult(value=mapper(v))
  68
  69
  70def _filter_lambda(v: T, predicate: Predicate[T]) -> StepResult[T]:
  71    """
  72    private function to describe filter method handler
  73    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
  74    """
  75    # lambda v: StepResult(value=v, go_next=predicate(v))
  76    return StepResult(value=v, go_next=predicate(v))
  77
  78
  79def _and_lambda(b1: bool, b2: bool) -> bool:
  80    """
  81    private function to describe a AND BiFunction
  82    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
  83    """
  84    # lambda b1, b2: b1 and b2
  85    return b1 and b2
  86
  87
  88def _stream_to_list_lambda(stream: Stream[T]) -> list[T]:
  89    """
  90    private function to describe a stream to list Function
  91    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
  92    Note: use lists because generators cannot be used in parallel context (generators are not serializable)
  93    """
  94    # lambda stream: stream.to_list()
  95    return stream.to_list()
  96
  97
  98def _peek_mapper(value: T, consumer: Consumer[T]) -> T:
  99    """
 100    private function to describe peek method mapper
 101    Note: inner function are not used in order to be compatible with multiprocessing (inner function are not
 102    serializable)
 103    """
 104    consumer(value)
 105    return value
 106
 107
 108def _natural_comparator(o1: Any, o2: Any) -> int:
 109    """
 110    private function to describe a natural order comparator
 111    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 112    """
 113    return 0 if o1 == o2 else 1 if o1 > o2 else -1  # pragma: no mutate
 114
 115
 116def _to_max_comparator(o1: T, o2: T, comparator: BiFunction[T, T, int]) -> int:
 117    """
 118    private function to describe a reverse order comparator
 119    Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 120    """
 121    return -comparator(o1, o2)
 122
 123
 124def _sync_execution(
 125        values: Iterable[V],
 126        pipeline: StepPipeline[V, T],
 127        stop_on_first_completed: bool,
 128        collector: Collector[T, R]
 129) -> R:
 130    """
 131    private function to describe a sync stream execution
 132    Note: Outside the Stream class in order to be compatible with multiprocessing
 133    """
 134    return _process_values(values, pipeline, stop_on_first_completed, collector)[0]
 135
 136
 137def _parallel_execution(  # pylint: disable=R0913
 138        values: Iterable[V],
 139        pipeline: StepPipeline[V, T],
 140        parallel_config: ParallelConfiguration,
 141        ordered_execution: bool,
 142        stop_on_first_completed: bool,
 143        collector: Collector[T, R]
 144) -> R:
 145    """
 146    private function to describe a parallel stream execution
 147    Note: Outside the Stream class in order to be compatible with multiprocessing
 148    """
 149    chunksize: int = parallel_config.chunksize
 150    executor: Executor = parallel_config.executor
 151    process_values: Final[partial[tuple[R, bool]]] = \
 152        partial(
 153            _process_values, pipeline=pipeline, stop_on_first_completed=stop_on_first_completed, collector=collector
 154        )
 155    futures: Final[list[Future[tuple[R, bool]]]] = \
 156        [executor.submit(process_values, chunk) for chunk in get_chunks(values, chunksize)]
 157
 158    return (_ordered_parallel_execution if ordered_execution else _unordered_parallel_execution)(
 159        futures=futures,
 160        collector=collector
 161    )
 162
 163
 164def _unordered_parallel_execution(futures: list[Future[tuple[R, bool]]], collector: Collector[T, R]) -> R:
 165    """
 166    private function to describe an unordered parallel stream execution
 167    Note: Outside the Stream class in order to be compatible with multiprocessing
 168    """
 169    done: Collection[Future[tuple[R, bool]]]
 170    not_done: Collection[Future[tuple[R, bool]]] = futures
 171
 172    try:
 173        results: R = collector.supplier()
 174        continue_processing: bool = True
 175
 176        # 'do {...} while' doesn't exist in python :'(
 177        while not_done and (continue_processing is True):  # pragma: no mutate
 178            done, not_done = wait(not_done, return_when=FIRST_COMPLETED)
 179            for future in done:
 180                chunk_result: tuple[R, bool] = future.result()
 181                results = collector.combiner(results, chunk_result[0])
 182                continue_processing = chunk_result[1]
 183                if continue_processing is False:
 184                    break  # pragma: no mutate
 185
 186        return results
 187    finally:
 188        for future in not_done:
 189            future.cancel()
 190
 191
 192def _ordered_parallel_execution(futures: list[Future[tuple[R, bool]]], collector: Collector[T, R]) -> R:
 193    """
 194    private function to describe an ordered parallel stream execution
 195    Note: Outside the Stream class in order to be compatible with multiprocessing
 196    """
 197    results: R = collector.supplier()
 198    processed_value: int = 0  # pragma: no mutate
 199    try:
 200        for future in futures:
 201            processed_value += 1  # pragma: no mutate
 202            chunk_result: tuple[R, bool] = future.result()
 203            results = collector.combiner(results, chunk_result[0])
 204            if chunk_result[1] is False:
 205                break  # pragma: no mutate
 206    finally:
 207        for i in range(processed_value, len(futures)):
 208            futures[i].cancel()
 209
 210    return results
 211
 212
 213def _process_values(
 214        values: Iterable[V],
 215        pipeline: StepPipeline[V, T],
 216        stop_on_first_completed: bool,
 217        collector: Collector[T, R]
 218) -> tuple[R, bool]:
 219    """
 220    private function to describe process stream value set execution
 221    Note: Outside the Stream class in order to be compatible with multiprocessing
 222    """
 223    continue_processing: bool = True
 224    # Note: Always ordered
 225    results: R = collector.supplier()
 226    for result in _sync_generator(values, pipeline):
 227        results = collector.accumulator(results, result)
 228        if stop_on_first_completed:
 229            continue_processing = False  # pragma: no mutate
 230            break  # pragma: no mutate
 231
 232    return results, continue_processing
 233
 234
 235def _sync_generator(values: Iterable[V], pipeline: StepPipeline[V, T]) -> Iterable[T]:
 236    """
 237    private function to describe a sync Stream generator
 238    Note: Outside the Stream class in order to be compatible with multiprocessing
 239    """
 240    for value in values:
 241        result: StepResult[Any] = pipeline.execute(value)
 242        # Note: go_next TRUE means the value has gone through the whole pipeline and has not been filtered
 243        if result.go_next:
 244            yield result.value
 245
 246
 247def _parallel_generator(
 248        values: Iterable[V],
 249        pipeline: StepPipeline[V, T],
 250        parallel_config: ParallelConfiguration,
 251        ordered_execution: bool
 252) -> Iterable[T]:
 253    """
 254    private function to describe a parallel Stream generator (ordered or not)
 255    Note: Outside the Stream class in order to be compatible with multiprocessing
 256    """
 257    chunksize: int = parallel_config.chunksize
 258    executor: Executor = parallel_config.executor
 259    process_values: Final[partial[tuple[list[T], bool]]] = \
 260        partial(_process_values, pipeline=pipeline, stop_on_first_completed=False, collector=collectors.to_list())
 261    futures: Final[list[Future[tuple[list[T], bool]]]] = \
 262        [executor.submit(process_values, chunk) for chunk in get_chunks(values, chunksize)]
 263    try:
 264        futures_iterable: Final[Iterable[Future[tuple[list[T], bool]]]] = \
 265            (futures if ordered_execution else as_completed(futures))
 266        for future in futures_iterable:
 267            yield from future.result()[0]
 268
 269    except BaseException as e:
 270        for future in futures:
 271            future.cancel()
 272        raise e
 273
 274
 275##############################
 276#       PUBLIC CLASSES       #
 277##############################
 278@dataclass(frozen=True)
 279class ParallelConfiguration:
 280    """
 281    ParallelConfiguration class that describes Stream parallel configuration when using parallel execution
 282
 283    Args:
 284        executor (Executor): The executor to use for parallel (/concurrent) execution
 285        chunksize (int): If greater than one, the stream values will be chopped into chunks of size chunksize and
 286            submitted to the process pool. If set to one, the items in the stream will be sent one at a time.
 287
 288    Raises:
 289        TypeError: if executor is None
 290        ValueError: if chunksize is less than 1
 291    """
 292    executor: Executor
 293    chunksize: int = 1
 294
 295    def __post_init__(self):
 296        require_non_none(self.executor, "executor must be non None")
 297        if require_non_none(self.chunksize, "chunksize must be non None") < 1:
 298            raise ValueError("chunksize must be greater than or equals to 1")
 299
 300
 301class Stream(Generic[T]):  # pylint: disable=R0904
 302    __CREATE_KEY: Final[object] = object()
 303
 304    @classmethod
 305    def empty(cls) -> Stream[T]:
 306        """
 307        Returns an empty sequential Stream
 308
 309        Returns:
 310            Stream[T]: An empty sequential Stream
 311        """
 312        return cls.of()
 313
 314    @classmethod
 315    def of(cls, *values: T) -> Stream[T]:
 316        """
 317        Returns a sequential ordered stream whose elements are the specified values
 318
 319        Args:
 320            *values: ordered stream elements
 321
 322        Returns:
 323            Stream[T]: A sequential ordered stream whose elements are the specified values
 324        """
 325        return cls.of_iterable(values)
 326
 327    @classmethod
 328    def of_iterable(cls, iterable: Iterable[T]) -> Stream[T]:
 329        """
 330        Returns a sequential ordered stream whose elements are values from the given Iterable
 331
 332        Args:
 333            iterable: iterable of stream source values
 334
 335        Returns:
 336            Stream[T]: A sequential ordered stream whose elements are values from the given Iterable
 337
 338        Raises:
 339            TypeError: if iterable is None
 340        """
 341        require_non_none(iterable)
 342        return cls._of(lambda p, o: iterable)
 343
 344    @classmethod
 345    def _of(
 346            cls,
 347            values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]],
 348            pipeline: Optional[StepPipeline[V, R]] = None,
 349            parallel_config: Optional[JOptional[ParallelConfiguration]] = None,
 350            ordered_execution: bool = False
 351    ) -> Stream[R]:
 352        """
 353        Private class method in order to simplify the call to Stream private constructor
 354
 355        Args:
 356            values_function: The BiFunction which provides the Stream values
 357            pipeline: The StepPipeline to be executed if presents by the Stream values on terminal operation
 358            parallel_config: The Stream parallel configuration
 359            ordered_execution: TRUE if the value must be returned to the encounter order
 360
 361        Returns:
 362            Stream[T]: A Stream corresponding to the given parameters
 363        """
 364        return Stream(
 365            values_function=values_function,
 366            pipeline=require_non_none_else_get(pipeline, _root_pipeline),
 367            parallel_config=require_non_none_else_get(parallel_config, JOptional.empty),
 368            ordered_execution=ordered_execution,
 369            create_key=cls.__CREATE_KEY
 370        )
 371
 372    def __init__(  # pylint: disable=R0913
 373            self,
 374            values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]],
 375            pipeline: StepPipeline[V, T],
 376            parallel_config: JOptional[ParallelConfiguration],
 377            ordered_execution: bool,
 378            create_key: object,
 379    ):
 380        """Stream private constructor: Constructs a Stream[T] inspired by java `java.util.stream.Stream<T>`"""
 381        assert create_key == self.__CREATE_KEY, "Stream private constructor! Please use Stream.of"
 382        self._values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]] = \
 383            require_non_none(values_function)
 384        self._pipeline: StepPipeline[V, T] = require_non_none(pipeline)
 385        self._parallel_config: JOptional[ParallelConfiguration] = require_non_none(parallel_config)
 386        self._ordered_execution: bool = require_non_none(ordered_execution)
 387
 388    def map(self, mapper: Function[T, R]) -> Stream[R]:
 389        """
 390        Returns a stream consisting of the results of applying the given function to the elements of this stream
 391
 392        Args:
 393            mapper (Function[T, R]): The function to apply to each element
 394
 395        Returns:
 396            Stream[R]: A stream consisting of the results of applying the given function to the elements of this stream
 397
 398        Raises:
 399            TypeError: if mapper is None
 400        """
 401        require_non_none(mapper)
 402        return Stream._of(
 403            values_function=self._values_function,
 404            # pylint: disable=E1101
 405            pipeline=self._pipeline.add_handler(cast(Function[T, StepResult[R]], partial(_map_lambda, mapper=mapper))),
 406            parallel_config=self._parallel_config,
 407            ordered_execution=self._ordered_execution
 408        )
 409
 410    def parallel(self, parallel_config: Optional[ParallelConfiguration]) -> Self:
 411        """
 412        Configures the Stream to use the given parallel configuration and return the current stream
 413
 414        Note: None parallel configuration is like calling `sequential()` method
 415
 416        Args:
 417            parallel_config: The parallel configuration to use
 418
 419        Returns:
 420            Stream[T]: The current Stream after set parallel configuration
 421        """
 422        self._parallel_config = JOptional.of_noneable(parallel_config)
 423        return self
 424
 425    def is_parallel(self) -> bool:
 426        """
 427        Returns whether this stream, if a terminal operation were to be executed, would execute in parallel
 428
 429        Returns:
 430            bool: True if the Stream uses parallel configuration, otherwise False
 431        """
 432        return self._parallel_config.is_present()
 433
 434    def sequential(self) -> Self:
 435        """
 436        Configures the Stream to use sequential configuration and return the current stream
 437        (i.e.: Remove the parallel configuration if exists)
 438
 439        Returns:
 440            Stream[T]: The current Stream after set sequential configuration
 441        """
 442        if self.is_parallel():
 443            self._parallel_config = JOptional.empty()
 444        return self
 445
 446    def filter(self, predicate: Predicate[T]) -> Stream[T]:
 447        """
 448        Returns a stream consisting of the elements of this stream that match the given predicate
 449
 450        Args:
 451            predicate: The predicate to apply to each element to determine if it should be included
 452
 453        Returns:
 454            Stream[T]: A stream consisting of the elements of this stream that match the given predicate
 455
 456        Raises:
 457            TypeError: if predicate is None
 458        """
 459        return Stream._of(
 460            values_function=self._values_function,
 461            # pylint: disable=E1101
 462            pipeline=self._pipeline.add_handler(
 463                cast(Function[T, StepResult[T]], partial(_filter_lambda, predicate=require_non_none(predicate)))
 464            ),
 465            parallel_config=self._parallel_config,
 466            ordered_execution=self._ordered_execution
 467        )
 468
 469    def collect(self, collector: Collector[T, R]) -> R:
 470        """
 471        Performs a reduction operation on the elements of this stream using a Collector
 472
 473        Args:
 474            collector: The Collector describing the reduction
 475
 476        Returns:
 477            R: The result of the reduction
 478
 479        Raises:
 480            TypeError: if collector is None
 481        """
 482        return self._execute(collector=require_non_none(collector))
 483
 484    def collect_from(self, supplier: Supplier[R], accumulator: BiConsumer[R, T], combiner: BiConsumer[R, R]) -> R:
 485        """
 486        Performs a reduction operation on the elements of this stream
 487
 488        Args:
 489            supplier: A function that creates a new result container. For a parallel execution, this function may be
 490                called multiple times and must return a fresh value each time
 491            accumulator: A function that must fold an element into a result container
 492            combiner: A unction that accepts two partial result containers and merges them, which must be compatible
 493                with the accumulator function. The combiner function must fold the elements from the second result
 494                container into the first result container
 495
 496        Returns:
 497            R: The result of the reduction
 498
 499        Raises:
 500            TypeError: if at least one parameter is None
 501        """
 502        return self.collect(collector=collectors.of_biconsumers(supplier, accumulator, combiner))
 503
 504    def to_list(self) -> list[T]:
 505        """
 506        Accumulates the elements of this stream into a list
 507
 508        Returns:
 509            list[T]: A list containing the stream elements
 510        """
 511        return self.collect(collector=collectors.to_list())
 512
 513    def to_tuple(self) -> tuple[T, ...]:
 514        """
 515        Accumulates the elements of this stream into a tuple
 516
 517        Returns:
 518            tuple[T, ...]: a tuple containing the stream elements
 519        """
 520        return self.collect(collector=collectors.to_tuple())
 521
 522    def to_dict(self, key_mapper: Function[T, K], value_mapper: Function[T, V]) -> dict[K, V]:
 523        """
 524        Accumulates the elements of this stream into a tuple dict whose keys and values are the result of applying the
 525        provided mapping functions to the input elements
 526
 527        Args:
 528            key_mapper: a mapping function to produce keys
 529            value_mapper: a mapping function to produce values
 530
 531        Returns:
 532            dict[K, V]: a dict containing the stream elements whose keys and values are the result of applying mapping
 533                functions to the input elements
 534        """
 535        return self.collect(collector=collectors.to_dict(key_mapper, value_mapper))
 536
 537    def all_match(self, predicate: Predicate[T]) -> bool:
 538        """
 539        Returns whether all elements of this stream match the provided predicate
 540
 541        Args:
 542            predicate: The predicate to apply to elements of this stream
 543
 544        Returns:
 545            bool: True if either all elements of the stream match the provided predicate or the stream is empty,
 546                otherwise False
 547
 548        Raises:
 549            TypeError: if predicate is None
 550        """
 551        return self.map(predicate).reduce(True, _and_lambda)
 552
 553    def any_match(self, predicate: Predicate[T]) -> bool:
 554        """
 555        Returns whether any elements of this stream match the provided predicate
 556
 557        Args:
 558            predicate: The predicate to apply to elements of this stream
 559
 560        Returns:
 561            bool: True if any elements of the stream match the provided predicate, otherwise False
 562
 563        Raises:
 564            TypeError: if predicate is None
 565        """
 566        return self.filter(predicate).find_any().is_present()
 567
 568    def count(self) -> int:
 569        """
 570        Returns the count of elements in this stream
 571
 572        Returns:
 573            int: The count of elements in this stream
 574        """
 575        return self.collect(collector=collectors.to_counter())
 576
 577    def for_each(self, consumer: Consumer[T]) -> None:
 578        """
 579        Performs an action (/consumer) for each element of this stream
 580
 581        Args:
 582            consumer: The Consumer to perform on the elements
 583
 584        Returns:
 585            None: nothing
 586
 587        Raises:
 588            TypeError: if consumer is None
 589        """
 590        self.peek(require_non_none(consumer))
 591        self._execute()
 592
 593    def flat_map(self, mapper: Function[T, Stream[R]]) -> Stream[R]:
 594        """
 595        Returns a stream consisting of the results of replacing each element of this stream with the contents of a
 596        mapped stream produced by applying the provided mapping function to each element
 597
 598        Args:
 599            mapper: A function to apply to each element which produces a stream of new values
 600
 601        Returns:
 602             Stream[R]: The new Stream
 603
 604        Raises:
 605            TypeError: if mapper is None
 606        """
 607        return self._from_self_config(
 608            values_function=cast(
 609                BiFunction[Optional[ParallelConfiguration], bool, Iterable[R]],
 610                partial(self._flat_map_values_function, mapper=require_non_none(mapper))
 611            )
 612        )
 613
 614    def ordered_execution(self, ordered: bool = True) -> Stream[T]:
 615        """
 616        Configures the current Stream to be ordered or not and returns it
 617
 618        Note: This configuration has no impact in sequential stream
 619
 620        Args:
 621            ordered: True if the Stream must be ordered, False for unordered. (Default: True)
 622
 623        Returns:
 624            Stream[T]: The current Stream with ordered configuration
 625
 626        Raises:
 627            TypeError: if ordered is None
 628        """
 629        return Stream._of(
 630            values_function=self._values_function,
 631            pipeline=self._pipeline,
 632            parallel_config=self._parallel_config,
 633            ordered_execution=ordered
 634        ) if self._ordered_execution != require_non_none(ordered) else self
 635
 636    def unordered(self) -> Stream[T]:
 637        """
 638        Configures the current Stream to be unordered and returns it
 639
 640        Note: Is equivalent to call `ordered_execution(False)`
 641        Note 2: This configuration has no impact in sequential stream
 642
 643        Returns:
 644            Stream[T]: The current Stream with unordered configuration
 645        """
 646        return self.ordered_execution(False)
 647
 648    def find_first(self) -> JOptional[T]:
 649        """
 650        Returns a JOptional describing the first element of this stream, or an empty JOptional if the stream is empty
 651
 652        Returns:
 653            JOptional[T]: A JOptional describing the first element of this stream, or an empty JOptional if the stream
 654                is empty
 655        """
 656        return self.ordered_execution(True).find_any()
 657
 658    def find_any(self) -> JOptional[T]:
 659        """
 660        Returns a JOptional describing some element of the stream, or an empty JOptional if the stream is empty
 661
 662        Note: if the stream is ordered, find_any and find_first are equivalents
 663
 664        Returns:
 665            JOptional[T]: A JOptional describing some element of the stream, or an empty JOptional if the stream is
 666                empty
 667        """
 668        results: list[T] = self._execute(
 669            stop_on_first_completed=True  # pragma: no mutate
 670            , collector=collectors.to_list()
 671        )
 672        return JOptional.of_noneable(results[0] if results else None)
 673
 674    def distinct(self) -> Stream[T]:
 675        """
 676        Returns a stream consisting of the distinct elements of this stream
 677
 678        Returns:
 679            Stream[T]: A stream consisting of the distinct elements of this stream
 680        """
 681        return self._from_self_config(values_function=self._distinct_values_function)
 682
 683    def drop_while(self, predicate: Predicate[T]) -> Stream[T]:
 684        """
 685        Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping
 686        the longest prefix of elements that match the given predicate. Otherwise, if this stream is unordered, returns a
 687        stream consisting of the remaining elements of this stream after dropping a subset of elements that match the
 688        given predicate.
 689
 690        Args:
 691            predicate: The predicate to apply to elements to determine the longest prefix of elements
 692
 693        Returns:
 694            Stream[T]: The new Stream
 695
 696        Raises:
 697            TypeError: if predicate is None
 698        """
 699        return self._from_self_config(
 700            values_function=partial(self._drop_while_values_function, predicate=require_non_none(predicate))
 701        )
 702
 703    def limit(self, limit: int) -> Stream[T]:
 704        """
 705        Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length
 706
 707        Args:
 708            limit: The number of elements the stream should be limited to
 709
 710        Returns:
 711            Stream[T]: The truncated Stream
 712
 713        Raises:
 714            TypeError: if limit is None
 715        """
 716        return self._from_self_config(
 717            values_function=partial(self._limit_values_function, limit=require_non_none(limit)))
 718
 719    def max(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]:
 720        """
 721        Returns the maximum element of this stream according to the provided comparator BiFunction
 722
 723        Args:
 724            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
 725                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
 726                (default: natural comparator)
 727
 728        Returns:
 729            JOptional[T]: A JOptional describing the maximum element of this stream, or an empty JOptional if the stream
 730                is empty
 731
 732        Raises:
 733            TypeError: if comparator is None
 734        """
 735        return self.min(partial(_to_max_comparator, comparator=require_non_none(comparator)))
 736
 737    def min(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]:
 738        """
 739        Returns the minimum element of this stream according to the provided comparator BiFunction
 740
 741        Args:
 742            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
 743                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
 744                (default: natural comparator)
 745
 746        Returns:
 747            JOptional[T]: A JOptional describing the minimum element of this stream, or an empty JOptional if the stream
 748                is empty
 749
 750        Raises:
 751            TypeError: if comparator is None
 752        """
 753        return self.sorted(require_non_none(comparator)).find_first()
 754
 755    def peek(self, consumer: Consumer[T]) -> Stream[T]:
 756        """
 757        Returns a stream consisting of the elements of this stream, additionally performing the provided action
 758        (/consumer) on each element as elements are consumed from the resulting stream
 759
 760        Args:
 761            consumer: The Consumer to perform on the elements as they are consumed from the stream
 762
 763        Returns:
 764            Stream[T]: The new Stream
 765
 766        Raises:
 767            TypeError: if consumer is None
 768        """
 769        return self.map(cast(Function[T, T], partial(_peek_mapper, consumer=require_non_none(consumer))))
 770
 771    def reduce(self, identity: T, accumulator: BiFunction[T, T, T]) -> T:
 772        """
 773        Performs a reduction on the elements of this stream, using the provided identity value and an associative
 774        accumulation function, and returns the reduced value
 775
 776        Args:
 777            identity: The identity value for the accumulating function
 778            accumulator: The BiFunction for combining two values
 779
 780        Returns:
 781            T: The result of the reduction
 782
 783        Raises:
 784            TypeError: accumulator is None
 785        """
 786        return self.collect(collectors.of(supplier=partial(to_self, obj=identity),
 787                                          accumulator=accumulator,
 788                                          combiner=accumulator
 789                                          )
 790                            )
 791
 792    def skip(self, n: int) -> Stream[T]:
 793        """
 794        Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of
 795        the stream
 796
 797        Note: If this stream contains fewer than n elements then an empty stream will be returned
 798
 799        Args:
 800            n: The number of leading elements to skip
 801
 802        Returns:
 803            Stream[T]: The new stream
 804
 805        Raises:
 806            TypeError: if n is None
 807
 808        """
 809        return self._from_self_config(values_function=partial(self._skip_values_function, n=require_non_none(n)))
 810
 811    def take_while(self, predicate: Predicate[T]) -> Stream[T]:
 812        """
 813        Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream
 814        that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of a subset
 815        of elements taken from this stream that match the given predicate
 816
 817        Args:
 818            predicate: The predicate to apply to elements to determine the longest prefix of element
 819
 820        Returns:
 821            Stream[T]: The new Stream
 822
 823        Raises:
 824            TypeError: if predicate is None
 825        """
 826        return self._from_self_config(
 827            values_function=partial(self._take_while_values_function, predicate=require_non_none(predicate))
 828        )
 829
 830    def sorted(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> Stream[T]:
 831        """
 832        Returns a stream consisting of the elements of this stream, sorted according to the provided comparator
 833
 834        Args:
 835            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
 836                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
 837                (default: natural comparator)
 838
 839        Returns:
 840            Stream[T]: The new Stream
 841
 842        Raises:
 843            TypeError: if comparator is None
 844        """
 845        self._ordered_execution = True
 846        return self._from_self_config(
 847            values_function=partial(self._sorted_values_function, comparator=require_non_none(comparator))
 848        )
 849
 850    def to_generator(self) -> Iterator[T]:
 851        """
 852        Returns a generator based on Stream inputs and mapping
 853
 854        Note: If the Stream is sequential each value is processed when __next__() is called. Otherwise, if the Stream
 855        uses a parallel configuration, all values are processed at the first __next__() call but each value is returned
 856        one by one depending on the 'ordered/unordered' configuration
 857
 858        Note 2: When stream uses parallel with unordered configuration each value is returned when it is available. With
 859        an ordered configuration the generator wait for each necessary value in order to respect the stream value order
 860
 861        Returns:
 862            Iterator[T]: The Stream processed values generator
 863        """
 864        # pylint: disable=E1102
 865        values: Final[Iterable[Any]] = \
 866            self._values_function(self._parallel_config.or_else(None), self._ordered_execution)
 867        if not values:
 868            return
 869
 870        if self.is_parallel():
 871            yield from _parallel_generator(
 872                values=values,
 873                pipeline=self._pipeline,
 874                parallel_config=self._parallel_config.get(),
 875                ordered_execution=self._ordered_execution
 876            )
 877        else:
 878            yield from _sync_generator(values=values, pipeline=self._pipeline)
 879
 880    def _execute(
 881            self,
 882            stop_on_first_completed: bool = False,
 883            collector: Collector[T, R] = _none_collector()
 884    ) -> R:
 885        """
 886        private method to execute the current Stream
 887        """
 888        require_non_none(stop_on_first_completed)
 889        require_non_none(collector)
 890        # pylint: disable=E1102
 891        values: Final[Optional[Iterable[Any]]] = \
 892            self._values_function(self._parallel_config.or_else(None), self._ordered_execution)
 893
 894        if not values:
 895            return collector.supplier()
 896
 897        if self.is_parallel():
 898            return _parallel_execution(
 899                values=values,
 900                pipeline=self._pipeline,
 901                parallel_config=self._parallel_config.get(),
 902                ordered_execution=self._ordered_execution,
 903                stop_on_first_completed=stop_on_first_completed,
 904                collector=collector
 905            )
 906
 907        return _sync_execution(
 908            values=values, pipeline=self._pipeline, stop_on_first_completed=stop_on_first_completed, collector=collector
 909        )
 910
 911    def _from_self_config(
 912            self, values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]]
 913    ) -> Stream[R]:
 914        """
 915        private method that returns a new Stream by copying the current configuration
 916        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 917        """
 918        return Stream._of(
 919            values_function=values_function,
 920            parallel_config=self._parallel_config,
 921            ordered_execution=self._ordered_execution
 922        )
 923
 924    def _init_to_values_function(
 925            self, parallel_config: Optional[ParallelConfiguration], ordered_execution: bool
 926    ) -> Stream[T]:
 927        """
 928        private method that initializes the current stream with the given configuration
 929        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 930        """
 931        return self.ordered_execution(ordered=ordered_execution).parallel(parallel_config)
 932
 933    def _sorted_values_function(
 934            self, parallel_config: ParallelConfiguration, ordered_execution: bool, comparator: BiFunction[T, T, int]
 935    ) -> Iterable[T]:
 936        """
 937        private method used by `sorted` public method in replacement of lambda
 938        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 939        """
 940        values: list[T] = self._init_to_values_function(parallel_config, ordered_execution).to_list()
 941        values.sort(key=cmp_to_key(comparator))
 942        return values
 943
 944    def _flat_map_values_function(
 945            self, parallel_config: Optional[ParallelConfiguration],
 946            ordered_execution: bool,
 947            mapper: Function[T, Stream[R]]
 948    ) -> Iterable[R]:
 949        """
 950        private method used by `flat_map` public method in replacement of lambda
 951        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 952        """
 953        list_generator: Final[Iterable[list[R]]] = (
 954            self
 955            ._init_to_values_function(parallel_config, ordered_execution)
 956            .map(mapper)
 957            .map(_stream_to_list_lambda)  # type: ignore
 958            .to_generator()
 959        )
 960        for values in list_generator:
 961            yield from values
 962
 963    def _distinct_values_function(
 964            self, parallel_config: Optional[ParallelConfiguration], ordered_execution: bool
 965    ) -> Iterable[T]:
 966        """
 967        private method used by `distinct` public method in replacement of lambda
 968        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 969        """
 970        existing_values: set[T] = set()
 971        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
 972            if not value in existing_values:
 973                existing_values.add(value)
 974                yield value
 975
 976    def _drop_while_values_function(
 977            self, parallel_config: ParallelConfiguration, ordered_execution: bool, predicate: Predicate[T]
 978    ) -> Iterable[T]:
 979        """
 980        private method used by `drop_while` public method in replacement of lambda
 981        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 982        """
 983        drop: bool = True
 984        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
 985            drop = drop and predicate(value)
 986            if drop is False:
 987                yield value
 988
 989    def _limit_values_function(
 990            self, parallel_config: ParallelConfiguration, ordered_execution: bool, limit: int
 991    ) -> Iterable[T]:
 992        """
 993        private method used by `limit` public method in replacement of lambda
 994        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 995        """
 996        if limit <= 0:
 997            return
 998
 999        counter: int = 0
1000        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
1001            yield value
1002            counter += 1
1003            if counter >= limit:
1004                break
1005
1006    def _skip_values_function(
1007            self, parallel_config: ParallelConfiguration, ordered_execution: bool, n: int
1008    ) -> Iterable[T]:
1009        """
1010        private method used by `skip` public method in replacement of lambda
1011        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
1012        """
1013        counter: int = 0
1014        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
1015            if counter >= n:
1016                yield value
1017            counter += 1
1018
1019    def _take_while_values_function(
1020            self, parallel_config: ParallelConfiguration, ordered_execution: bool, predicate: Predicate[T]
1021    ) -> Iterable[T]:
1022        """
1023        private method used by `take_while` public method in replacement of lambda
1024        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
1025        """
1026        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
1027            if not predicate(value):
1028                break  # pragma: no mutate
1029            yield value
@dataclass(frozen=True)
class ParallelConfiguration:
279@dataclass(frozen=True)
280class ParallelConfiguration:
281    """
282    ParallelConfiguration class that describes Stream parallel configuration when using parallel execution
283
284    Args:
285        executor (Executor): The executor to use for parallel (/concurrent) execution
286        chunksize (int): If greater than one, the stream values will be chopped into chunks of size chunksize and
287            submitted to the process pool. If set to one, the items in the stream will be sent one at a time.
288
289    Raises:
290        TypeError: if executor is None
291        ValueError: if chunksize is less than 1
292    """
293    executor: Executor
294    chunksize: int = 1
295
296    def __post_init__(self):
297        require_non_none(self.executor, "executor must be non None")
298        if require_non_none(self.chunksize, "chunksize must be non None") < 1:
299            raise ValueError("chunksize must be greater than or equals to 1")

ParallelConfiguration class that describes Stream parallel configuration when using parallel execution

Arguments:
  • executor (Executor): The executor to use for parallel (/concurrent) execution
  • chunksize (int): If greater than one, the stream values will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the stream will be sent one at a time.
Raises:
  • TypeError: if executor is None
  • ValueError: if chunksize is less than 1
ParallelConfiguration(executor: concurrent.futures._base.Executor, chunksize: int = 1)
executor: concurrent.futures._base.Executor
chunksize: int = 1
class Stream(typing.Generic[~T]):
 302class Stream(Generic[T]):  # pylint: disable=R0904
 303    __CREATE_KEY: Final[object] = object()
 304
 305    @classmethod
 306    def empty(cls) -> Stream[T]:
 307        """
 308        Returns an empty sequential Stream
 309
 310        Returns:
 311            Stream[T]: An empty sequential Stream
 312        """
 313        return cls.of()
 314
 315    @classmethod
 316    def of(cls, *values: T) -> Stream[T]:
 317        """
 318        Returns a sequential ordered stream whose elements are the specified values
 319
 320        Args:
 321            *values: ordered stream elements
 322
 323        Returns:
 324            Stream[T]: A sequential ordered stream whose elements are the specified values
 325        """
 326        return cls.of_iterable(values)
 327
 328    @classmethod
 329    def of_iterable(cls, iterable: Iterable[T]) -> Stream[T]:
 330        """
 331        Returns a sequential ordered stream whose elements are values from the given Iterable
 332
 333        Args:
 334            iterable: iterable of stream source values
 335
 336        Returns:
 337            Stream[T]: A sequential ordered stream whose elements are values from the given Iterable
 338
 339        Raises:
 340            TypeError: if iterable is None
 341        """
 342        require_non_none(iterable)
 343        return cls._of(lambda p, o: iterable)
 344
 345    @classmethod
 346    def _of(
 347            cls,
 348            values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]],
 349            pipeline: Optional[StepPipeline[V, R]] = None,
 350            parallel_config: Optional[JOptional[ParallelConfiguration]] = None,
 351            ordered_execution: bool = False
 352    ) -> Stream[R]:
 353        """
 354        Private class method in order to simplify the call to Stream private constructor
 355
 356        Args:
 357            values_function: The BiFunction which provides the Stream values
 358            pipeline: The StepPipeline to be executed if presents by the Stream values on terminal operation
 359            parallel_config: The Stream parallel configuration
 360            ordered_execution: TRUE if the value must be returned to the encounter order
 361
 362        Returns:
 363            Stream[T]: A Stream corresponding to the given parameters
 364        """
 365        return Stream(
 366            values_function=values_function,
 367            pipeline=require_non_none_else_get(pipeline, _root_pipeline),
 368            parallel_config=require_non_none_else_get(parallel_config, JOptional.empty),
 369            ordered_execution=ordered_execution,
 370            create_key=cls.__CREATE_KEY
 371        )
 372
 373    def __init__(  # pylint: disable=R0913
 374            self,
 375            values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]],
 376            pipeline: StepPipeline[V, T],
 377            parallel_config: JOptional[ParallelConfiguration],
 378            ordered_execution: bool,
 379            create_key: object,
 380    ):
 381        """Stream private constructor: Constructs a Stream[T] inspired by java `java.util.stream.Stream<T>`"""
 382        assert create_key == self.__CREATE_KEY, "Stream private constructor! Please use Stream.of"
 383        self._values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]] = \
 384            require_non_none(values_function)
 385        self._pipeline: StepPipeline[V, T] = require_non_none(pipeline)
 386        self._parallel_config: JOptional[ParallelConfiguration] = require_non_none(parallel_config)
 387        self._ordered_execution: bool = require_non_none(ordered_execution)
 388
 389    def map(self, mapper: Function[T, R]) -> Stream[R]:
 390        """
 391        Returns a stream consisting of the results of applying the given function to the elements of this stream
 392
 393        Args:
 394            mapper (Function[T, R]): The function to apply to each element
 395
 396        Returns:
 397            Stream[R]: A stream consisting of the results of applying the given function to the elements of this stream
 398
 399        Raises:
 400            TypeError: if mapper is None
 401        """
 402        require_non_none(mapper)
 403        return Stream._of(
 404            values_function=self._values_function,
 405            # pylint: disable=E1101
 406            pipeline=self._pipeline.add_handler(cast(Function[T, StepResult[R]], partial(_map_lambda, mapper=mapper))),
 407            parallel_config=self._parallel_config,
 408            ordered_execution=self._ordered_execution
 409        )
 410
 411    def parallel(self, parallel_config: Optional[ParallelConfiguration]) -> Self:
 412        """
 413        Configures the Stream to use the given parallel configuration and return the current stream
 414
 415        Note: None parallel configuration is like calling `sequential()` method
 416
 417        Args:
 418            parallel_config: The parallel configuration to use
 419
 420        Returns:
 421            Stream[T]: The current Stream after set parallel configuration
 422        """
 423        self._parallel_config = JOptional.of_noneable(parallel_config)
 424        return self
 425
 426    def is_parallel(self) -> bool:
 427        """
 428        Returns whether this stream, if a terminal operation were to be executed, would execute in parallel
 429
 430        Returns:
 431            bool: True if the Stream uses parallel configuration, otherwise False
 432        """
 433        return self._parallel_config.is_present()
 434
 435    def sequential(self) -> Self:
 436        """
 437        Configures the Stream to use sequential configuration and return the current stream
 438        (i.e.: Remove the parallel configuration if exists)
 439
 440        Returns:
 441            Stream[T]: The current Stream after set sequential configuration
 442        """
 443        if self.is_parallel():
 444            self._parallel_config = JOptional.empty()
 445        return self
 446
 447    def filter(self, predicate: Predicate[T]) -> Stream[T]:
 448        """
 449        Returns a stream consisting of the elements of this stream that match the given predicate
 450
 451        Args:
 452            predicate: The predicate to apply to each element to determine if it should be included
 453
 454        Returns:
 455            Stream[T]: A stream consisting of the elements of this stream that match the given predicate
 456
 457        Raises:
 458            TypeError: if predicate is None
 459        """
 460        return Stream._of(
 461            values_function=self._values_function,
 462            # pylint: disable=E1101
 463            pipeline=self._pipeline.add_handler(
 464                cast(Function[T, StepResult[T]], partial(_filter_lambda, predicate=require_non_none(predicate)))
 465            ),
 466            parallel_config=self._parallel_config,
 467            ordered_execution=self._ordered_execution
 468        )
 469
 470    def collect(self, collector: Collector[T, R]) -> R:
 471        """
 472        Performs a reduction operation on the elements of this stream using a Collector
 473
 474        Args:
 475            collector: The Collector describing the reduction
 476
 477        Returns:
 478            R: The result of the reduction
 479
 480        Raises:
 481            TypeError: if collector is None
 482        """
 483        return self._execute(collector=require_non_none(collector))
 484
 485    def collect_from(self, supplier: Supplier[R], accumulator: BiConsumer[R, T], combiner: BiConsumer[R, R]) -> R:
 486        """
 487        Performs a reduction operation on the elements of this stream
 488
 489        Args:
 490            supplier: A function that creates a new result container. For a parallel execution, this function may be
 491                called multiple times and must return a fresh value each time
 492            accumulator: A function that must fold an element into a result container
 493            combiner: A unction that accepts two partial result containers and merges them, which must be compatible
 494                with the accumulator function. The combiner function must fold the elements from the second result
 495                container into the first result container
 496
 497        Returns:
 498            R: The result of the reduction
 499
 500        Raises:
 501            TypeError: if at least one parameter is None
 502        """
 503        return self.collect(collector=collectors.of_biconsumers(supplier, accumulator, combiner))
 504
 505    def to_list(self) -> list[T]:
 506        """
 507        Accumulates the elements of this stream into a list
 508
 509        Returns:
 510            list[T]: A list containing the stream elements
 511        """
 512        return self.collect(collector=collectors.to_list())
 513
 514    def to_tuple(self) -> tuple[T, ...]:
 515        """
 516        Accumulates the elements of this stream into a tuple
 517
 518        Returns:
 519            tuple[T, ...]: a tuple containing the stream elements
 520        """
 521        return self.collect(collector=collectors.to_tuple())
 522
 523    def to_dict(self, key_mapper: Function[T, K], value_mapper: Function[T, V]) -> dict[K, V]:
 524        """
 525        Accumulates the elements of this stream into a tuple dict whose keys and values are the result of applying the
 526        provided mapping functions to the input elements
 527
 528        Args:
 529            key_mapper: a mapping function to produce keys
 530            value_mapper: a mapping function to produce values
 531
 532        Returns:
 533            dict[K, V]: a dict containing the stream elements whose keys and values are the result of applying mapping
 534                functions to the input elements
 535        """
 536        return self.collect(collector=collectors.to_dict(key_mapper, value_mapper))
 537
 538    def all_match(self, predicate: Predicate[T]) -> bool:
 539        """
 540        Returns whether all elements of this stream match the provided predicate
 541
 542        Args:
 543            predicate: The predicate to apply to elements of this stream
 544
 545        Returns:
 546            bool: True if either all elements of the stream match the provided predicate or the stream is empty,
 547                otherwise False
 548
 549        Raises:
 550            TypeError: if predicate is None
 551        """
 552        return self.map(predicate).reduce(True, _and_lambda)
 553
 554    def any_match(self, predicate: Predicate[T]) -> bool:
 555        """
 556        Returns whether any elements of this stream match the provided predicate
 557
 558        Args:
 559            predicate: The predicate to apply to elements of this stream
 560
 561        Returns:
 562            bool: True if any elements of the stream match the provided predicate, otherwise False
 563
 564        Raises:
 565            TypeError: if predicate is None
 566        """
 567        return self.filter(predicate).find_any().is_present()
 568
 569    def count(self) -> int:
 570        """
 571        Returns the count of elements in this stream
 572
 573        Returns:
 574            int: The count of elements in this stream
 575        """
 576        return self.collect(collector=collectors.to_counter())
 577
 578    def for_each(self, consumer: Consumer[T]) -> None:
 579        """
 580        Performs an action (/consumer) for each element of this stream
 581
 582        Args:
 583            consumer: The Consumer to perform on the elements
 584
 585        Returns:
 586            None: nothing
 587
 588        Raises:
 589            TypeError: if consumer is None
 590        """
 591        self.peek(require_non_none(consumer))
 592        self._execute()
 593
 594    def flat_map(self, mapper: Function[T, Stream[R]]) -> Stream[R]:
 595        """
 596        Returns a stream consisting of the results of replacing each element of this stream with the contents of a
 597        mapped stream produced by applying the provided mapping function to each element
 598
 599        Args:
 600            mapper: A function to apply to each element which produces a stream of new values
 601
 602        Returns:
 603             Stream[R]: The new Stream
 604
 605        Raises:
 606            TypeError: if mapper is None
 607        """
 608        return self._from_self_config(
 609            values_function=cast(
 610                BiFunction[Optional[ParallelConfiguration], bool, Iterable[R]],
 611                partial(self._flat_map_values_function, mapper=require_non_none(mapper))
 612            )
 613        )
 614
 615    def ordered_execution(self, ordered: bool = True) -> Stream[T]:
 616        """
 617        Configures the current Stream to be ordered or not and returns it
 618
 619        Note: This configuration has no impact in sequential stream
 620
 621        Args:
 622            ordered: True if the Stream must be ordered, False for unordered. (Default: True)
 623
 624        Returns:
 625            Stream[T]: The current Stream with ordered configuration
 626
 627        Raises:
 628            TypeError: if ordered is None
 629        """
 630        return Stream._of(
 631            values_function=self._values_function,
 632            pipeline=self._pipeline,
 633            parallel_config=self._parallel_config,
 634            ordered_execution=ordered
 635        ) if self._ordered_execution != require_non_none(ordered) else self
 636
 637    def unordered(self) -> Stream[T]:
 638        """
 639        Configures the current Stream to be unordered and returns it
 640
 641        Note: Is equivalent to call `ordered_execution(False)`
 642        Note 2: This configuration has no impact in sequential stream
 643
 644        Returns:
 645            Stream[T]: The current Stream with unordered configuration
 646        """
 647        return self.ordered_execution(False)
 648
 649    def find_first(self) -> JOptional[T]:
 650        """
 651        Returns a JOptional describing the first element of this stream, or an empty JOptional if the stream is empty
 652
 653        Returns:
 654            JOptional[T]: A JOptional describing the first element of this stream, or an empty JOptional if the stream
 655                is empty
 656        """
 657        return self.ordered_execution(True).find_any()
 658
 659    def find_any(self) -> JOptional[T]:
 660        """
 661        Returns a JOptional describing some element of the stream, or an empty JOptional if the stream is empty
 662
 663        Note: if the stream is ordered, find_any and find_first are equivalents
 664
 665        Returns:
 666            JOptional[T]: A JOptional describing some element of the stream, or an empty JOptional if the stream is
 667                empty
 668        """
 669        results: list[T] = self._execute(
 670            stop_on_first_completed=True  # pragma: no mutate
 671            , collector=collectors.to_list()
 672        )
 673        return JOptional.of_noneable(results[0] if results else None)
 674
 675    def distinct(self) -> Stream[T]:
 676        """
 677        Returns a stream consisting of the distinct elements of this stream
 678
 679        Returns:
 680            Stream[T]: A stream consisting of the distinct elements of this stream
 681        """
 682        return self._from_self_config(values_function=self._distinct_values_function)
 683
 684    def drop_while(self, predicate: Predicate[T]) -> Stream[T]:
 685        """
 686        Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping
 687        the longest prefix of elements that match the given predicate. Otherwise, if this stream is unordered, returns a
 688        stream consisting of the remaining elements of this stream after dropping a subset of elements that match the
 689        given predicate.
 690
 691        Args:
 692            predicate: The predicate to apply to elements to determine the longest prefix of elements
 693
 694        Returns:
 695            Stream[T]: The new Stream
 696
 697        Raises:
 698            TypeError: if predicate is None
 699        """
 700        return self._from_self_config(
 701            values_function=partial(self._drop_while_values_function, predicate=require_non_none(predicate))
 702        )
 703
 704    def limit(self, limit: int) -> Stream[T]:
 705        """
 706        Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length
 707
 708        Args:
 709            limit: The number of elements the stream should be limited to
 710
 711        Returns:
 712            Stream[T]: The truncated Stream
 713
 714        Raises:
 715            TypeError: if limit is None
 716        """
 717        return self._from_self_config(
 718            values_function=partial(self._limit_values_function, limit=require_non_none(limit)))
 719
 720    def max(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]:
 721        """
 722        Returns the maximum element of this stream according to the provided comparator BiFunction
 723
 724        Args:
 725            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
 726                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
 727                (default: natural comparator)
 728
 729        Returns:
 730            JOptional[T]: A JOptional describing the maximum element of this stream, or an empty JOptional if the stream
 731                is empty
 732
 733        Raises:
 734            TypeError: if comparator is None
 735        """
 736        return self.min(partial(_to_max_comparator, comparator=require_non_none(comparator)))
 737
 738    def min(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]:
 739        """
 740        Returns the minimum element of this stream according to the provided comparator BiFunction
 741
 742        Args:
 743            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
 744                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
 745                (default: natural comparator)
 746
 747        Returns:
 748            JOptional[T]: A JOptional describing the minimum element of this stream, or an empty JOptional if the stream
 749                is empty
 750
 751        Raises:
 752            TypeError: if comparator is None
 753        """
 754        return self.sorted(require_non_none(comparator)).find_first()
 755
 756    def peek(self, consumer: Consumer[T]) -> Stream[T]:
 757        """
 758        Returns a stream consisting of the elements of this stream, additionally performing the provided action
 759        (/consumer) on each element as elements are consumed from the resulting stream
 760
 761        Args:
 762            consumer: The Consumer to perform on the elements as they are consumed from the stream
 763
 764        Returns:
 765            Stream[T]: The new Stream
 766
 767        Raises:
 768            TypeError: if consumer is None
 769        """
 770        return self.map(cast(Function[T, T], partial(_peek_mapper, consumer=require_non_none(consumer))))
 771
 772    def reduce(self, identity: T, accumulator: BiFunction[T, T, T]) -> T:
 773        """
 774        Performs a reduction on the elements of this stream, using the provided identity value and an associative
 775        accumulation function, and returns the reduced value
 776
 777        Args:
 778            identity: The identity value for the accumulating function
 779            accumulator: The BiFunction for combining two values
 780
 781        Returns:
 782            T: The result of the reduction
 783
 784        Raises:
 785            TypeError: accumulator is None
 786        """
 787        return self.collect(collectors.of(supplier=partial(to_self, obj=identity),
 788                                          accumulator=accumulator,
 789                                          combiner=accumulator
 790                                          )
 791                            )
 792
 793    def skip(self, n: int) -> Stream[T]:
 794        """
 795        Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of
 796        the stream
 797
 798        Note: If this stream contains fewer than n elements then an empty stream will be returned
 799
 800        Args:
 801            n: The number of leading elements to skip
 802
 803        Returns:
 804            Stream[T]: The new stream
 805
 806        Raises:
 807            TypeError: if n is None
 808
 809        """
 810        return self._from_self_config(values_function=partial(self._skip_values_function, n=require_non_none(n)))
 811
 812    def take_while(self, predicate: Predicate[T]) -> Stream[T]:
 813        """
 814        Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream
 815        that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of a subset
 816        of elements taken from this stream that match the given predicate
 817
 818        Args:
 819            predicate: The predicate to apply to elements to determine the longest prefix of element
 820
 821        Returns:
 822            Stream[T]: The new Stream
 823
 824        Raises:
 825            TypeError: if predicate is None
 826        """
 827        return self._from_self_config(
 828            values_function=partial(self._take_while_values_function, predicate=require_non_none(predicate))
 829        )
 830
 831    def sorted(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> Stream[T]:
 832        """
 833        Returns a stream consisting of the elements of this stream, sorted according to the provided comparator
 834
 835        Args:
 836            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
 837                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
 838                (default: natural comparator)
 839
 840        Returns:
 841            Stream[T]: The new Stream
 842
 843        Raises:
 844            TypeError: if comparator is None
 845        """
 846        self._ordered_execution = True
 847        return self._from_self_config(
 848            values_function=partial(self._sorted_values_function, comparator=require_non_none(comparator))
 849        )
 850
 851    def to_generator(self) -> Iterator[T]:
 852        """
 853        Returns a generator based on Stream inputs and mapping
 854
 855        Note: If the Stream is sequential each value is processed when __next__() is called. Otherwise, if the Stream
 856        uses a parallel configuration, all values are processed at the first __next__() call but each value is returned
 857        one by one depending on the 'ordered/unordered' configuration
 858
 859        Note 2: When stream uses parallel with unordered configuration each value is returned when it is available. With
 860        an ordered configuration the generator wait for each necessary value in order to respect the stream value order
 861
 862        Returns:
 863            Iterator[T]: The Stream processed values generator
 864        """
 865        # pylint: disable=E1102
 866        values: Final[Iterable[Any]] = \
 867            self._values_function(self._parallel_config.or_else(None), self._ordered_execution)
 868        if not values:
 869            return
 870
 871        if self.is_parallel():
 872            yield from _parallel_generator(
 873                values=values,
 874                pipeline=self._pipeline,
 875                parallel_config=self._parallel_config.get(),
 876                ordered_execution=self._ordered_execution
 877            )
 878        else:
 879            yield from _sync_generator(values=values, pipeline=self._pipeline)
 880
 881    def _execute(
 882            self,
 883            stop_on_first_completed: bool = False,
 884            collector: Collector[T, R] = _none_collector()
 885    ) -> R:
 886        """
 887        private method to execute the current Stream
 888        """
 889        require_non_none(stop_on_first_completed)
 890        require_non_none(collector)
 891        # pylint: disable=E1102
 892        values: Final[Optional[Iterable[Any]]] = \
 893            self._values_function(self._parallel_config.or_else(None), self._ordered_execution)
 894
 895        if not values:
 896            return collector.supplier()
 897
 898        if self.is_parallel():
 899            return _parallel_execution(
 900                values=values,
 901                pipeline=self._pipeline,
 902                parallel_config=self._parallel_config.get(),
 903                ordered_execution=self._ordered_execution,
 904                stop_on_first_completed=stop_on_first_completed,
 905                collector=collector
 906            )
 907
 908        return _sync_execution(
 909            values=values, pipeline=self._pipeline, stop_on_first_completed=stop_on_first_completed, collector=collector
 910        )
 911
 912    def _from_self_config(
 913            self, values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]]
 914    ) -> Stream[R]:
 915        """
 916        private method that returns a new Stream by copying the current configuration
 917        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 918        """
 919        return Stream._of(
 920            values_function=values_function,
 921            parallel_config=self._parallel_config,
 922            ordered_execution=self._ordered_execution
 923        )
 924
 925    def _init_to_values_function(
 926            self, parallel_config: Optional[ParallelConfiguration], ordered_execution: bool
 927    ) -> Stream[T]:
 928        """
 929        private method that initializes the current stream with the given configuration
 930        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 931        """
 932        return self.ordered_execution(ordered=ordered_execution).parallel(parallel_config)
 933
 934    def _sorted_values_function(
 935            self, parallel_config: ParallelConfiguration, ordered_execution: bool, comparator: BiFunction[T, T, int]
 936    ) -> Iterable[T]:
 937        """
 938        private method used by `sorted` public method in replacement of lambda
 939        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 940        """
 941        values: list[T] = self._init_to_values_function(parallel_config, ordered_execution).to_list()
 942        values.sort(key=cmp_to_key(comparator))
 943        return values
 944
 945    def _flat_map_values_function(
 946            self, parallel_config: Optional[ParallelConfiguration],
 947            ordered_execution: bool,
 948            mapper: Function[T, Stream[R]]
 949    ) -> Iterable[R]:
 950        """
 951        private method used by `flat_map` public method in replacement of lambda
 952        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 953        """
 954        list_generator: Final[Iterable[list[R]]] = (
 955            self
 956            ._init_to_values_function(parallel_config, ordered_execution)
 957            .map(mapper)
 958            .map(_stream_to_list_lambda)  # type: ignore
 959            .to_generator()
 960        )
 961        for values in list_generator:
 962            yield from values
 963
 964    def _distinct_values_function(
 965            self, parallel_config: Optional[ParallelConfiguration], ordered_execution: bool
 966    ) -> Iterable[T]:
 967        """
 968        private method used by `distinct` public method in replacement of lambda
 969        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 970        """
 971        existing_values: set[T] = set()
 972        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
 973            if not value in existing_values:
 974                existing_values.add(value)
 975                yield value
 976
 977    def _drop_while_values_function(
 978            self, parallel_config: ParallelConfiguration, ordered_execution: bool, predicate: Predicate[T]
 979    ) -> Iterable[T]:
 980        """
 981        private method used by `drop_while` public method in replacement of lambda
 982        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 983        """
 984        drop: bool = True
 985        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
 986            drop = drop and predicate(value)
 987            if drop is False:
 988                yield value
 989
 990    def _limit_values_function(
 991            self, parallel_config: ParallelConfiguration, ordered_execution: bool, limit: int
 992    ) -> Iterable[T]:
 993        """
 994        private method used by `limit` public method in replacement of lambda
 995        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
 996        """
 997        if limit <= 0:
 998            return
 999
1000        counter: int = 0
1001        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
1002            yield value
1003            counter += 1
1004            if counter >= limit:
1005                break
1006
1007    def _skip_values_function(
1008            self, parallel_config: ParallelConfiguration, ordered_execution: bool, n: int
1009    ) -> Iterable[T]:
1010        """
1011        private method used by `skip` public method in replacement of lambda
1012        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
1013        """
1014        counter: int = 0
1015        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
1016            if counter >= n:
1017                yield value
1018            counter += 1
1019
1020    def _take_while_values_function(
1021            self, parallel_config: ParallelConfiguration, ordered_execution: bool, predicate: Predicate[T]
1022    ) -> Iterable[T]:
1023        """
1024        private method used by `take_while` public method in replacement of lambda
1025        Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable)
1026        """
1027        for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator():
1028            if not predicate(value):
1029                break  # pragma: no mutate
1030            yield value

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Stream( values_function: Callable[[Optional[ParallelConfiguration], bool], Iterable[~V]], pipeline: dev4py.utils.pipeline.step_pipeline.StepPipeline[~V, ~T], parallel_config: dev4py.utils.joptional.JOptional[ParallelConfiguration], ordered_execution: bool, create_key: object)
373    def __init__(  # pylint: disable=R0913
374            self,
375            values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]],
376            pipeline: StepPipeline[V, T],
377            parallel_config: JOptional[ParallelConfiguration],
378            ordered_execution: bool,
379            create_key: object,
380    ):
381        """Stream private constructor: Constructs a Stream[T] inspired by java `java.util.stream.Stream<T>`"""
382        assert create_key == self.__CREATE_KEY, "Stream private constructor! Please use Stream.of"
383        self._values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]] = \
384            require_non_none(values_function)
385        self._pipeline: StepPipeline[V, T] = require_non_none(pipeline)
386        self._parallel_config: JOptional[ParallelConfiguration] = require_non_none(parallel_config)
387        self._ordered_execution: bool = require_non_none(ordered_execution)

Stream private constructor: Constructs a Stream[T] inspired by java java.util.stream.Stream<T>

@classmethod
def empty(cls) -> Stream[~T]:
305    @classmethod
306    def empty(cls) -> Stream[T]:
307        """
308        Returns an empty sequential Stream
309
310        Returns:
311            Stream[T]: An empty sequential Stream
312        """
313        return cls.of()

Returns an empty sequential Stream

Returns:

Stream[T]: An empty sequential Stream

@classmethod
def of(cls, *values: ~T) -> Stream[~T]:
315    @classmethod
316    def of(cls, *values: T) -> Stream[T]:
317        """
318        Returns a sequential ordered stream whose elements are the specified values
319
320        Args:
321            *values: ordered stream elements
322
323        Returns:
324            Stream[T]: A sequential ordered stream whose elements are the specified values
325        """
326        return cls.of_iterable(values)

Returns a sequential ordered stream whose elements are the specified values

Arguments:
  • *values: ordered stream elements
Returns:

Stream[T]: A sequential ordered stream whose elements are the specified values

@classmethod
def of_iterable(cls, iterable: Iterable[~T]) -> Stream[~T]:
328    @classmethod
329    def of_iterable(cls, iterable: Iterable[T]) -> Stream[T]:
330        """
331        Returns a sequential ordered stream whose elements are values from the given Iterable
332
333        Args:
334            iterable: iterable of stream source values
335
336        Returns:
337            Stream[T]: A sequential ordered stream whose elements are values from the given Iterable
338
339        Raises:
340            TypeError: if iterable is None
341        """
342        require_non_none(iterable)
343        return cls._of(lambda p, o: iterable)

Returns a sequential ordered stream whose elements are values from the given Iterable

Arguments:
  • iterable: iterable of stream source values
Returns:

Stream[T]: A sequential ordered stream whose elements are values from the given Iterable

Raises:
  • TypeError: if iterable is None
def map(self, mapper: Callable[[~T], ~R]) -> Stream[~R]:
389    def map(self, mapper: Function[T, R]) -> Stream[R]:
390        """
391        Returns a stream consisting of the results of applying the given function to the elements of this stream
392
393        Args:
394            mapper (Function[T, R]): The function to apply to each element
395
396        Returns:
397            Stream[R]: A stream consisting of the results of applying the given function to the elements of this stream
398
399        Raises:
400            TypeError: if mapper is None
401        """
402        require_non_none(mapper)
403        return Stream._of(
404            values_function=self._values_function,
405            # pylint: disable=E1101
406            pipeline=self._pipeline.add_handler(cast(Function[T, StepResult[R]], partial(_map_lambda, mapper=mapper))),
407            parallel_config=self._parallel_config,
408            ordered_execution=self._ordered_execution
409        )

Returns a stream consisting of the results of applying the given function to the elements of this stream

Arguments:
  • mapper (Function[T, R]): The function to apply to each element
Returns:

Stream[R]: A stream consisting of the results of applying the given function to the elements of this stream

Raises:
  • TypeError: if mapper is None
def parallel( self, parallel_config: Optional[ParallelConfiguration]) -> Self:
411    def parallel(self, parallel_config: Optional[ParallelConfiguration]) -> Self:
412        """
413        Configures the Stream to use the given parallel configuration and return the current stream
414
415        Note: None parallel configuration is like calling `sequential()` method
416
417        Args:
418            parallel_config: The parallel configuration to use
419
420        Returns:
421            Stream[T]: The current Stream after set parallel configuration
422        """
423        self._parallel_config = JOptional.of_noneable(parallel_config)
424        return self

Configures the Stream to use the given parallel configuration and return the current stream

Note: None parallel configuration is like calling sequential() method

Arguments:
  • parallel_config: The parallel configuration to use
Returns:

Stream[T]: The current Stream after set parallel configuration

def is_parallel(self) -> bool:
426    def is_parallel(self) -> bool:
427        """
428        Returns whether this stream, if a terminal operation were to be executed, would execute in parallel
429
430        Returns:
431            bool: True if the Stream uses parallel configuration, otherwise False
432        """
433        return self._parallel_config.is_present()

Returns whether this stream, if a terminal operation were to be executed, would execute in parallel

Returns:

bool: True if the Stream uses parallel configuration, otherwise False

def sequential(self) -> Self:
435    def sequential(self) -> Self:
436        """
437        Configures the Stream to use sequential configuration and return the current stream
438        (i.e.: Remove the parallel configuration if exists)
439
440        Returns:
441            Stream[T]: The current Stream after set sequential configuration
442        """
443        if self.is_parallel():
444            self._parallel_config = JOptional.empty()
445        return self

Configures the Stream to use sequential configuration and return the current stream (i.e.: Remove the parallel configuration if exists)

Returns:

Stream[T]: The current Stream after set sequential configuration

def filter(self, predicate: Callable[[~T], bool]) -> Stream[~T]:
447    def filter(self, predicate: Predicate[T]) -> Stream[T]:
448        """
449        Returns a stream consisting of the elements of this stream that match the given predicate
450
451        Args:
452            predicate: The predicate to apply to each element to determine if it should be included
453
454        Returns:
455            Stream[T]: A stream consisting of the elements of this stream that match the given predicate
456
457        Raises:
458            TypeError: if predicate is None
459        """
460        return Stream._of(
461            values_function=self._values_function,
462            # pylint: disable=E1101
463            pipeline=self._pipeline.add_handler(
464                cast(Function[T, StepResult[T]], partial(_filter_lambda, predicate=require_non_none(predicate)))
465            ),
466            parallel_config=self._parallel_config,
467            ordered_execution=self._ordered_execution
468        )

Returns a stream consisting of the elements of this stream that match the given predicate

Arguments:
  • predicate: The predicate to apply to each element to determine if it should be included
Returns:

Stream[T]: A stream consisting of the elements of this stream that match the given predicate

Raises:
  • TypeError: if predicate is None
def collect(self, collector: dev4py.utils.collectors.Collector[~T, ~R]) -> ~R:
470    def collect(self, collector: Collector[T, R]) -> R:
471        """
472        Performs a reduction operation on the elements of this stream using a Collector
473
474        Args:
475            collector: The Collector describing the reduction
476
477        Returns:
478            R: The result of the reduction
479
480        Raises:
481            TypeError: if collector is None
482        """
483        return self._execute(collector=require_non_none(collector))

Performs a reduction operation on the elements of this stream using a Collector

Arguments:
  • collector: The Collector describing the reduction
Returns:

R: The result of the reduction

Raises:
  • TypeError: if collector is None
def collect_from( self, supplier: Callable[[], ~R], accumulator: Callable[[~R, ~T], NoneType], combiner: Callable[[~R, ~R], NoneType]) -> ~R:
485    def collect_from(self, supplier: Supplier[R], accumulator: BiConsumer[R, T], combiner: BiConsumer[R, R]) -> R:
486        """
487        Performs a reduction operation on the elements of this stream
488
489        Args:
490            supplier: A function that creates a new result container. For a parallel execution, this function may be
491                called multiple times and must return a fresh value each time
492            accumulator: A function that must fold an element into a result container
493            combiner: A unction that accepts two partial result containers and merges them, which must be compatible
494                with the accumulator function. The combiner function must fold the elements from the second result
495                container into the first result container
496
497        Returns:
498            R: The result of the reduction
499
500        Raises:
501            TypeError: if at least one parameter is None
502        """
503        return self.collect(collector=collectors.of_biconsumers(supplier, accumulator, combiner))

Performs a reduction operation on the elements of this stream

Arguments:
  • supplier: A function that creates a new result container. For a parallel execution, this function may be called multiple times and must return a fresh value each time
  • accumulator: A function that must fold an element into a result container
  • combiner: A unction that accepts two partial result containers and merges them, which must be compatible with the accumulator function. The combiner function must fold the elements from the second result container into the first result container
Returns:

R: The result of the reduction

Raises:
  • TypeError: if at least one parameter is None
def to_list(self) -> list[~T]:
505    def to_list(self) -> list[T]:
506        """
507        Accumulates the elements of this stream into a list
508
509        Returns:
510            list[T]: A list containing the stream elements
511        """
512        return self.collect(collector=collectors.to_list())

Accumulates the elements of this stream into a list

Returns:

list[T]: A list containing the stream elements

def to_tuple(self) -> tuple[~T, ...]:
514    def to_tuple(self) -> tuple[T, ...]:
515        """
516        Accumulates the elements of this stream into a tuple
517
518        Returns:
519            tuple[T, ...]: a tuple containing the stream elements
520        """
521        return self.collect(collector=collectors.to_tuple())

Accumulates the elements of this stream into a tuple

Returns:

tuple[T, ...]: a tuple containing the stream elements

def to_dict( self, key_mapper: Callable[[~T], ~K], value_mapper: Callable[[~T], ~V]) -> dict[~K, ~V]:
523    def to_dict(self, key_mapper: Function[T, K], value_mapper: Function[T, V]) -> dict[K, V]:
524        """
525        Accumulates the elements of this stream into a tuple dict whose keys and values are the result of applying the
526        provided mapping functions to the input elements
527
528        Args:
529            key_mapper: a mapping function to produce keys
530            value_mapper: a mapping function to produce values
531
532        Returns:
533            dict[K, V]: a dict containing the stream elements whose keys and values are the result of applying mapping
534                functions to the input elements
535        """
536        return self.collect(collector=collectors.to_dict(key_mapper, value_mapper))

Accumulates the elements of this stream into a tuple dict whose keys and values are the result of applying the provided mapping functions to the input elements

Arguments:
  • key_mapper: a mapping function to produce keys
  • value_mapper: a mapping function to produce values
Returns:

dict[K, V]: a dict containing the stream elements whose keys and values are the result of applying mapping functions to the input elements

def all_match(self, predicate: Callable[[~T], bool]) -> bool:
538    def all_match(self, predicate: Predicate[T]) -> bool:
539        """
540        Returns whether all elements of this stream match the provided predicate
541
542        Args:
543            predicate: The predicate to apply to elements of this stream
544
545        Returns:
546            bool: True if either all elements of the stream match the provided predicate or the stream is empty,
547                otherwise False
548
549        Raises:
550            TypeError: if predicate is None
551        """
552        return self.map(predicate).reduce(True, _and_lambda)

Returns whether all elements of this stream match the provided predicate

Arguments:
  • predicate: The predicate to apply to elements of this stream
Returns:

bool: True if either all elements of the stream match the provided predicate or the stream is empty, otherwise False

Raises:
  • TypeError: if predicate is None
def any_match(self, predicate: Callable[[~T], bool]) -> bool:
554    def any_match(self, predicate: Predicate[T]) -> bool:
555        """
556        Returns whether any elements of this stream match the provided predicate
557
558        Args:
559            predicate: The predicate to apply to elements of this stream
560
561        Returns:
562            bool: True if any elements of the stream match the provided predicate, otherwise False
563
564        Raises:
565            TypeError: if predicate is None
566        """
567        return self.filter(predicate).find_any().is_present()

Returns whether any elements of this stream match the provided predicate

Arguments:
  • predicate: The predicate to apply to elements of this stream
Returns:

bool: True if any elements of the stream match the provided predicate, otherwise False

Raises:
  • TypeError: if predicate is None
def count(self) -> int:
569    def count(self) -> int:
570        """
571        Returns the count of elements in this stream
572
573        Returns:
574            int: The count of elements in this stream
575        """
576        return self.collect(collector=collectors.to_counter())

Returns the count of elements in this stream

Returns:

int: The count of elements in this stream

def for_each(self, consumer: Callable[[~T], NoneType]) -> None:
578    def for_each(self, consumer: Consumer[T]) -> None:
579        """
580        Performs an action (/consumer) for each element of this stream
581
582        Args:
583            consumer: The Consumer to perform on the elements
584
585        Returns:
586            None: nothing
587
588        Raises:
589            TypeError: if consumer is None
590        """
591        self.peek(require_non_none(consumer))
592        self._execute()

Performs an action (/consumer) for each element of this stream

Arguments:
  • consumer: The Consumer to perform on the elements
Returns:

None: nothing

Raises:
  • TypeError: if consumer is None
def flat_map( self, mapper: Callable[[~T], Stream[~R]]) -> Stream[~R]:
594    def flat_map(self, mapper: Function[T, Stream[R]]) -> Stream[R]:
595        """
596        Returns a stream consisting of the results of replacing each element of this stream with the contents of a
597        mapped stream produced by applying the provided mapping function to each element
598
599        Args:
600            mapper: A function to apply to each element which produces a stream of new values
601
602        Returns:
603             Stream[R]: The new Stream
604
605        Raises:
606            TypeError: if mapper is None
607        """
608        return self._from_self_config(
609            values_function=cast(
610                BiFunction[Optional[ParallelConfiguration], bool, Iterable[R]],
611                partial(self._flat_map_values_function, mapper=require_non_none(mapper))
612            )
613        )

Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element

Arguments:
  • mapper: A function to apply to each element which produces a stream of new values
Returns:

Stream[R]: The new Stream

Raises:
  • TypeError: if mapper is None
def ordered_execution(self, ordered: bool = True) -> Stream[~T]:
615    def ordered_execution(self, ordered: bool = True) -> Stream[T]:
616        """
617        Configures the current Stream to be ordered or not and returns it
618
619        Note: This configuration has no impact in sequential stream
620
621        Args:
622            ordered: True if the Stream must be ordered, False for unordered. (Default: True)
623
624        Returns:
625            Stream[T]: The current Stream with ordered configuration
626
627        Raises:
628            TypeError: if ordered is None
629        """
630        return Stream._of(
631            values_function=self._values_function,
632            pipeline=self._pipeline,
633            parallel_config=self._parallel_config,
634            ordered_execution=ordered
635        ) if self._ordered_execution != require_non_none(ordered) else self

Configures the current Stream to be ordered or not and returns it

Note: This configuration has no impact in sequential stream

Arguments:
  • ordered: True if the Stream must be ordered, False for unordered. (Default: True)
Returns:

Stream[T]: The current Stream with ordered configuration

Raises:
  • TypeError: if ordered is None
def unordered(self) -> Stream[~T]:
637    def unordered(self) -> Stream[T]:
638        """
639        Configures the current Stream to be unordered and returns it
640
641        Note: Is equivalent to call `ordered_execution(False)`
642        Note 2: This configuration has no impact in sequential stream
643
644        Returns:
645            Stream[T]: The current Stream with unordered configuration
646        """
647        return self.ordered_execution(False)

Configures the current Stream to be unordered and returns it

Note: Is equivalent to call ordered_execution(False) Note 2: This configuration has no impact in sequential stream

Returns:

Stream[T]: The current Stream with unordered configuration

def find_first(self) -> dev4py.utils.joptional.JOptional[~T]:
649    def find_first(self) -> JOptional[T]:
650        """
651        Returns a JOptional describing the first element of this stream, or an empty JOptional if the stream is empty
652
653        Returns:
654            JOptional[T]: A JOptional describing the first element of this stream, or an empty JOptional if the stream
655                is empty
656        """
657        return self.ordered_execution(True).find_any()

Returns a JOptional describing the first element of this stream, or an empty JOptional if the stream is empty

Returns:

JOptional[T]: A JOptional describing the first element of this stream, or an empty JOptional if the stream is empty

def find_any(self) -> dev4py.utils.joptional.JOptional[~T]:
659    def find_any(self) -> JOptional[T]:
660        """
661        Returns a JOptional describing some element of the stream, or an empty JOptional if the stream is empty
662
663        Note: if the stream is ordered, find_any and find_first are equivalents
664
665        Returns:
666            JOptional[T]: A JOptional describing some element of the stream, or an empty JOptional if the stream is
667                empty
668        """
669        results: list[T] = self._execute(
670            stop_on_first_completed=True  # pragma: no mutate
671            , collector=collectors.to_list()
672        )
673        return JOptional.of_noneable(results[0] if results else None)

Returns a JOptional describing some element of the stream, or an empty JOptional if the stream is empty

Note: if the stream is ordered, find_any and find_first are equivalents

Returns:

JOptional[T]: A JOptional describing some element of the stream, or an empty JOptional if the stream is empty

def distinct(self) -> Stream[~T]:
675    def distinct(self) -> Stream[T]:
676        """
677        Returns a stream consisting of the distinct elements of this stream
678
679        Returns:
680            Stream[T]: A stream consisting of the distinct elements of this stream
681        """
682        return self._from_self_config(values_function=self._distinct_values_function)

Returns a stream consisting of the distinct elements of this stream

Returns:

Stream[T]: A stream consisting of the distinct elements of this stream

def drop_while(self, predicate: Callable[[~T], bool]) -> Stream[~T]:
684    def drop_while(self, predicate: Predicate[T]) -> Stream[T]:
685        """
686        Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping
687        the longest prefix of elements that match the given predicate. Otherwise, if this stream is unordered, returns a
688        stream consisting of the remaining elements of this stream after dropping a subset of elements that match the
689        given predicate.
690
691        Args:
692            predicate: The predicate to apply to elements to determine the longest prefix of elements
693
694        Returns:
695            Stream[T]: The new Stream
696
697        Raises:
698            TypeError: if predicate is None
699        """
700        return self._from_self_config(
701            values_function=partial(self._drop_while_values_function, predicate=require_non_none(predicate))
702        )

Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping the longest prefix of elements that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of the remaining elements of this stream after dropping a subset of elements that match the given predicate.

Arguments:
  • predicate: The predicate to apply to elements to determine the longest prefix of elements
Returns:

Stream[T]: The new Stream

Raises:
  • TypeError: if predicate is None
def limit(self, limit: int) -> Stream[~T]:
704    def limit(self, limit: int) -> Stream[T]:
705        """
706        Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length
707
708        Args:
709            limit: The number of elements the stream should be limited to
710
711        Returns:
712            Stream[T]: The truncated Stream
713
714        Raises:
715            TypeError: if limit is None
716        """
717        return self._from_self_config(
718            values_function=partial(self._limit_values_function, limit=require_non_none(limit)))

Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length

Arguments:
  • limit: The number of elements the stream should be limited to
Returns:

Stream[T]: The truncated Stream

Raises:
  • TypeError: if limit is None
def max( self, comparator: Callable[[~T, ~T], int] = <function _natural_comparator>) -> dev4py.utils.joptional.JOptional[~T]:
720    def max(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]:
721        """
722        Returns the maximum element of this stream according to the provided comparator BiFunction
723
724        Args:
725            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
726                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
727                (default: natural comparator)
728
729        Returns:
730            JOptional[T]: A JOptional describing the maximum element of this stream, or an empty JOptional if the stream
731                is empty
732
733        Raises:
734            TypeError: if comparator is None
735        """
736        return self.min(partial(_to_max_comparator, comparator=require_non_none(comparator)))

Returns the maximum element of this stream according to the provided comparator BiFunction

Arguments:
  • comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, zero, or a positive integer as the first argument is less than, equal to, or greater than the second) (default: natural comparator)
Returns:

JOptional[T]: A JOptional describing the maximum element of this stream, or an empty JOptional if the stream is empty

Raises:
  • TypeError: if comparator is None
def min( self, comparator: Callable[[~T, ~T], int] = <function _natural_comparator>) -> dev4py.utils.joptional.JOptional[~T]:
738    def min(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]:
739        """
740        Returns the minimum element of this stream according to the provided comparator BiFunction
741
742        Args:
743            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
744                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
745                (default: natural comparator)
746
747        Returns:
748            JOptional[T]: A JOptional describing the minimum element of this stream, or an empty JOptional if the stream
749                is empty
750
751        Raises:
752            TypeError: if comparator is None
753        """
754        return self.sorted(require_non_none(comparator)).find_first()

Returns the minimum element of this stream according to the provided comparator BiFunction

Arguments:
  • comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, zero, or a positive integer as the first argument is less than, equal to, or greater than the second) (default: natural comparator)
Returns:

JOptional[T]: A JOptional describing the minimum element of this stream, or an empty JOptional if the stream is empty

Raises:
  • TypeError: if comparator is None
def peek( self, consumer: Callable[[~T], NoneType]) -> Stream[~T]:
756    def peek(self, consumer: Consumer[T]) -> Stream[T]:
757        """
758        Returns a stream consisting of the elements of this stream, additionally performing the provided action
759        (/consumer) on each element as elements are consumed from the resulting stream
760
761        Args:
762            consumer: The Consumer to perform on the elements as they are consumed from the stream
763
764        Returns:
765            Stream[T]: The new Stream
766
767        Raises:
768            TypeError: if consumer is None
769        """
770        return self.map(cast(Function[T, T], partial(_peek_mapper, consumer=require_non_none(consumer))))

Returns a stream consisting of the elements of this stream, additionally performing the provided action (/consumer) on each element as elements are consumed from the resulting stream

Arguments:
  • consumer: The Consumer to perform on the elements as they are consumed from the stream
Returns:

Stream[T]: The new Stream

Raises:
  • TypeError: if consumer is None
def reduce(self, identity: ~T, accumulator: Callable[[~T, ~T], ~T]) -> ~T:
772    def reduce(self, identity: T, accumulator: BiFunction[T, T, T]) -> T:
773        """
774        Performs a reduction on the elements of this stream, using the provided identity value and an associative
775        accumulation function, and returns the reduced value
776
777        Args:
778            identity: The identity value for the accumulating function
779            accumulator: The BiFunction for combining two values
780
781        Returns:
782            T: The result of the reduction
783
784        Raises:
785            TypeError: accumulator is None
786        """
787        return self.collect(collectors.of(supplier=partial(to_self, obj=identity),
788                                          accumulator=accumulator,
789                                          combiner=accumulator
790                                          )
791                            )

Performs a reduction on the elements of this stream, using the provided identity value and an associative accumulation function, and returns the reduced value

Arguments:
  • identity: The identity value for the accumulating function
  • accumulator: The BiFunction for combining two values
Returns:

T: The result of the reduction

Raises:
  • TypeError: accumulator is None
def skip(self, n: int) -> Stream[~T]:
793    def skip(self, n: int) -> Stream[T]:
794        """
795        Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of
796        the stream
797
798        Note: If this stream contains fewer than n elements then an empty stream will be returned
799
800        Args:
801            n: The number of leading elements to skip
802
803        Returns:
804            Stream[T]: The new stream
805
806        Raises:
807            TypeError: if n is None
808
809        """
810        return self._from_self_config(values_function=partial(self._skip_values_function, n=require_non_none(n)))

Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream

Note: If this stream contains fewer than n elements then an empty stream will be returned

Arguments:
  • n: The number of leading elements to skip
Returns:

Stream[T]: The new stream

Raises:
  • TypeError: if n is None
def take_while(self, predicate: Callable[[~T], bool]) -> Stream[~T]:
812    def take_while(self, predicate: Predicate[T]) -> Stream[T]:
813        """
814        Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream
815        that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of a subset
816        of elements taken from this stream that match the given predicate
817
818        Args:
819            predicate: The predicate to apply to elements to determine the longest prefix of element
820
821        Returns:
822            Stream[T]: The new Stream
823
824        Raises:
825            TypeError: if predicate is None
826        """
827        return self._from_self_config(
828            values_function=partial(self._take_while_values_function, predicate=require_non_none(predicate))
829        )

Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of a subset of elements taken from this stream that match the given predicate

Arguments:
  • predicate: The predicate to apply to elements to determine the longest prefix of element
Returns:

Stream[T]: The new Stream

Raises:
  • TypeError: if predicate is None
def sorted( self, comparator: Callable[[~T, ~T], int] = <function _natural_comparator>) -> Stream[~T]:
831    def sorted(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> Stream[T]:
832        """
833        Returns a stream consisting of the elements of this stream, sorted according to the provided comparator
834
835        Args:
836            comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int,
837                zero, or a positive integer as the first argument is less than, equal to, or greater than the second)
838                (default: natural comparator)
839
840        Returns:
841            Stream[T]: The new Stream
842
843        Raises:
844            TypeError: if comparator is None
845        """
846        self._ordered_execution = True
847        return self._from_self_config(
848            values_function=partial(self._sorted_values_function, comparator=require_non_none(comparator))
849        )

Returns a stream consisting of the elements of this stream, sorted according to the provided comparator

Arguments:
  • comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, zero, or a positive integer as the first argument is less than, equal to, or greater than the second) (default: natural comparator)
Returns:

Stream[T]: The new Stream

Raises:
  • TypeError: if comparator is None
def to_generator(self) -> Iterator[~T]:
851    def to_generator(self) -> Iterator[T]:
852        """
853        Returns a generator based on Stream inputs and mapping
854
855        Note: If the Stream is sequential each value is processed when __next__() is called. Otherwise, if the Stream
856        uses a parallel configuration, all values are processed at the first __next__() call but each value is returned
857        one by one depending on the 'ordered/unordered' configuration
858
859        Note 2: When stream uses parallel with unordered configuration each value is returned when it is available. With
860        an ordered configuration the generator wait for each necessary value in order to respect the stream value order
861
862        Returns:
863            Iterator[T]: The Stream processed values generator
864        """
865        # pylint: disable=E1102
866        values: Final[Iterable[Any]] = \
867            self._values_function(self._parallel_config.or_else(None), self._ordered_execution)
868        if not values:
869            return
870
871        if self.is_parallel():
872            yield from _parallel_generator(
873                values=values,
874                pipeline=self._pipeline,
875                parallel_config=self._parallel_config.get(),
876                ordered_execution=self._ordered_execution
877            )
878        else:
879            yield from _sync_generator(values=values, pipeline=self._pipeline)

Returns a generator based on Stream inputs and mapping

Note: If the Stream is sequential each value is processed when __next__() is called. Otherwise, if the Stream uses a parallel configuration, all values are processed at the first __next__() call but each value is returned one by one depending on the 'ordered/unordered' configuration

Note 2: When stream uses parallel with unordered configuration each value is returned when it is available. With an ordered configuration the generator wait for each necessary value in order to respect the stream value order

Returns:

Iterator[T]: The Stream processed values generator