dev4py.utils.stream
Stream module inspired by java.util.stream
1"""Stream module inspired by `java.util.stream`""" # pylint: disable=C0302 2 3# Copyright 2022 the original author or authors (i.e.: St4rG00se for Dev4py). 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# https://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from __future__ import annotations 18 19from concurrent.futures import Executor, wait, FIRST_COMPLETED, Future, as_completed 20from dataclasses import dataclass 21from functools import partial, cmp_to_key 22from typing import Generic, Final, Optional, cast, Any, Collection, Iterable, Iterator, Self 23 24from dev4py.utils import collectors 25from dev4py.utils.collectors import Collector 26from dev4py.utils.iterables import get_chunks 27from dev4py.utils.joptional import JOptional 28from dev4py.utils.objects import require_non_none, require_non_none_else_get, to_self 29from dev4py.utils.pipeline import StepPipeline, StepResult 30from dev4py.utils.types import T, Function, R, V, Predicate, Supplier, BiConsumer, K, Consumer, BiFunction 31 32 33############################## 34# PRIVATE MODULE FUNCTIONS # 35############################## 36def _default_handler(v: V) -> StepResult[R]: 37 """ 38 private function to describe default pipeline handler 39 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 40 """ 41 # lambda v: StepResult(cast(R, v)) 42 return StepResult(cast(R, v)) 43 44 45def _root_pipeline() -> StepPipeline[V, R]: 46 """ 47 private function to describe default pipeline 48 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 49 """ 50 return StepPipeline.of(_default_handler) 51 52 53def _none_collector() -> Collector[T, R]: 54 """ 55 private function to cast none_collector 56 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 57 """ 58 return cast(Collector[T, R], collectors.to_none()) 59 60 61def _map_lambda(v: T, mapper: Function[T, R]) -> StepResult[R]: 62 """ 63 private function to describe map method handler 64 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 65 """ 66 # lambda v: StepResult(value=mapper(v)) 67 return StepResult(value=mapper(v)) 68 69 70def _filter_lambda(v: T, predicate: Predicate[T]) -> StepResult[T]: 71 """ 72 private function to describe filter method handler 73 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 74 """ 75 # lambda v: StepResult(value=v, go_next=predicate(v)) 76 return StepResult(value=v, go_next=predicate(v)) 77 78 79def _and_lambda(b1: bool, b2: bool) -> bool: 80 """ 81 private function to describe a AND BiFunction 82 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 83 """ 84 # lambda b1, b2: b1 and b2 85 return b1 and b2 86 87 88def _stream_to_list_lambda(stream: Stream[T]) -> list[T]: 89 """ 90 private function to describe a stream to list Function 91 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 92 Note: use lists because generators cannot be used in parallel context (generators are not serializable) 93 """ 94 # lambda stream: stream.to_list() 95 return stream.to_list() 96 97 98def _peek_mapper(value: T, consumer: Consumer[T]) -> T: 99 """ 100 private function to describe peek method mapper 101 Note: inner function are not used in order to be compatible with multiprocessing (inner function are not 102 serializable) 103 """ 104 consumer(value) 105 return value 106 107 108def _natural_comparator(o1: Any, o2: Any) -> int: 109 """ 110 private function to describe a natural order comparator 111 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 112 """ 113 return 0 if o1 == o2 else 1 if o1 > o2 else -1 # pragma: no mutate 114 115 116def _to_max_comparator(o1: T, o2: T, comparator: BiFunction[T, T, int]) -> int: 117 """ 118 private function to describe a reverse order comparator 119 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 120 """ 121 return -comparator(o1, o2) 122 123 124def _sync_execution( 125 values: Iterable[V], 126 pipeline: StepPipeline[V, T], 127 stop_on_first_completed: bool, 128 collector: Collector[T, R] 129) -> R: 130 """ 131 private function to describe a sync stream execution 132 Note: Outside the Stream class in order to be compatible with multiprocessing 133 """ 134 return _process_values(values, pipeline, stop_on_first_completed, collector)[0] 135 136 137def _parallel_execution( # pylint: disable=R0913 138 values: Iterable[V], 139 pipeline: StepPipeline[V, T], 140 parallel_config: ParallelConfiguration, 141 ordered_execution: bool, 142 stop_on_first_completed: bool, 143 collector: Collector[T, R] 144) -> R: 145 """ 146 private function to describe a parallel stream execution 147 Note: Outside the Stream class in order to be compatible with multiprocessing 148 """ 149 chunksize: int = parallel_config.chunksize 150 executor: Executor = parallel_config.executor 151 process_values: Final[partial[tuple[R, bool]]] = \ 152 partial( 153 _process_values, pipeline=pipeline, stop_on_first_completed=stop_on_first_completed, collector=collector 154 ) 155 futures: Final[list[Future[tuple[R, bool]]]] = \ 156 [executor.submit(process_values, chunk) for chunk in get_chunks(values, chunksize)] 157 158 return (_ordered_parallel_execution if ordered_execution else _unordered_parallel_execution)( 159 futures=futures, 160 collector=collector 161 ) 162 163 164def _unordered_parallel_execution(futures: list[Future[tuple[R, bool]]], collector: Collector[T, R]) -> R: 165 """ 166 private function to describe an unordered parallel stream execution 167 Note: Outside the Stream class in order to be compatible with multiprocessing 168 """ 169 done: Collection[Future[tuple[R, bool]]] 170 not_done: Collection[Future[tuple[R, bool]]] = futures 171 172 try: 173 results: R = collector.supplier() 174 continue_processing: bool = True 175 176 # 'do {...} while' doesn't exist in python :'( 177 while not_done and (continue_processing is True): # pragma: no mutate 178 done, not_done = wait(not_done, return_when=FIRST_COMPLETED) 179 for future in done: 180 chunk_result: tuple[R, bool] = future.result() 181 results = collector.combiner(results, chunk_result[0]) 182 continue_processing = chunk_result[1] 183 if continue_processing is False: 184 break # pragma: no mutate 185 186 return results 187 finally: 188 for future in not_done: 189 future.cancel() 190 191 192def _ordered_parallel_execution(futures: list[Future[tuple[R, bool]]], collector: Collector[T, R]) -> R: 193 """ 194 private function to describe an ordered parallel stream execution 195 Note: Outside the Stream class in order to be compatible with multiprocessing 196 """ 197 results: R = collector.supplier() 198 processed_value: int = 0 # pragma: no mutate 199 try: 200 for future in futures: 201 processed_value += 1 # pragma: no mutate 202 chunk_result: tuple[R, bool] = future.result() 203 results = collector.combiner(results, chunk_result[0]) 204 if chunk_result[1] is False: 205 break # pragma: no mutate 206 finally: 207 for i in range(processed_value, len(futures)): 208 futures[i].cancel() 209 210 return results 211 212 213def _process_values( 214 values: Iterable[V], 215 pipeline: StepPipeline[V, T], 216 stop_on_first_completed: bool, 217 collector: Collector[T, R] 218) -> tuple[R, bool]: 219 """ 220 private function to describe process stream value set execution 221 Note: Outside the Stream class in order to be compatible with multiprocessing 222 """ 223 continue_processing: bool = True 224 # Note: Always ordered 225 results: R = collector.supplier() 226 for result in _sync_generator(values, pipeline): 227 results = collector.accumulator(results, result) 228 if stop_on_first_completed: 229 continue_processing = False # pragma: no mutate 230 break # pragma: no mutate 231 232 return results, continue_processing 233 234 235def _sync_generator(values: Iterable[V], pipeline: StepPipeline[V, T]) -> Iterable[T]: 236 """ 237 private function to describe a sync Stream generator 238 Note: Outside the Stream class in order to be compatible with multiprocessing 239 """ 240 for value in values: 241 result: StepResult[Any] = pipeline.execute(value) 242 # Note: go_next TRUE means the value has gone through the whole pipeline and has not been filtered 243 if result.go_next: 244 yield result.value 245 246 247def _parallel_generator( 248 values: Iterable[V], 249 pipeline: StepPipeline[V, T], 250 parallel_config: ParallelConfiguration, 251 ordered_execution: bool 252) -> Iterable[T]: 253 """ 254 private function to describe a parallel Stream generator (ordered or not) 255 Note: Outside the Stream class in order to be compatible with multiprocessing 256 """ 257 chunksize: int = parallel_config.chunksize 258 executor: Executor = parallel_config.executor 259 process_values: Final[partial[tuple[list[T], bool]]] = \ 260 partial(_process_values, pipeline=pipeline, stop_on_first_completed=False, collector=collectors.to_list()) 261 futures: Final[list[Future[tuple[list[T], bool]]]] = \ 262 [executor.submit(process_values, chunk) for chunk in get_chunks(values, chunksize)] 263 try: 264 futures_iterable: Final[Iterable[Future[tuple[list[T], bool]]]] = \ 265 (futures if ordered_execution else as_completed(futures)) 266 for future in futures_iterable: 267 yield from future.result()[0] 268 269 except BaseException as e: 270 for future in futures: 271 future.cancel() 272 raise e 273 274 275############################## 276# PUBLIC CLASSES # 277############################## 278@dataclass(frozen=True) 279class ParallelConfiguration: 280 """ 281 ParallelConfiguration class that describes Stream parallel configuration when using parallel execution 282 283 Args: 284 executor (Executor): The executor to use for parallel (/concurrent) execution 285 chunksize (int): If greater than one, the stream values will be chopped into chunks of size chunksize and 286 submitted to the process pool. If set to one, the items in the stream will be sent one at a time. 287 288 Raises: 289 TypeError: if executor is None 290 ValueError: if chunksize is less than 1 291 """ 292 executor: Executor 293 chunksize: int = 1 294 295 def __post_init__(self): 296 require_non_none(self.executor, "executor must be non None") 297 if require_non_none(self.chunksize, "chunksize must be non None") < 1: 298 raise ValueError("chunksize must be greater than or equals to 1") 299 300 301class Stream(Generic[T]): # pylint: disable=R0904 302 __CREATE_KEY: Final[object] = object() 303 304 @classmethod 305 def empty(cls) -> Stream[T]: 306 """ 307 Returns an empty sequential Stream 308 309 Returns: 310 Stream[T]: An empty sequential Stream 311 """ 312 return cls.of() 313 314 @classmethod 315 def of(cls, *values: T) -> Stream[T]: 316 """ 317 Returns a sequential ordered stream whose elements are the specified values 318 319 Args: 320 *values: ordered stream elements 321 322 Returns: 323 Stream[T]: A sequential ordered stream whose elements are the specified values 324 """ 325 return cls.of_iterable(values) 326 327 @classmethod 328 def of_iterable(cls, iterable: Iterable[T]) -> Stream[T]: 329 """ 330 Returns a sequential ordered stream whose elements are values from the given Iterable 331 332 Args: 333 iterable: iterable of stream source values 334 335 Returns: 336 Stream[T]: A sequential ordered stream whose elements are values from the given Iterable 337 338 Raises: 339 TypeError: if iterable is None 340 """ 341 require_non_none(iterable) 342 return cls._of(lambda p, o: iterable) 343 344 @classmethod 345 def _of( 346 cls, 347 values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]], 348 pipeline: Optional[StepPipeline[V, R]] = None, 349 parallel_config: Optional[JOptional[ParallelConfiguration]] = None, 350 ordered_execution: bool = False 351 ) -> Stream[R]: 352 """ 353 Private class method in order to simplify the call to Stream private constructor 354 355 Args: 356 values_function: The BiFunction which provides the Stream values 357 pipeline: The StepPipeline to be executed if presents by the Stream values on terminal operation 358 parallel_config: The Stream parallel configuration 359 ordered_execution: TRUE if the value must be returned to the encounter order 360 361 Returns: 362 Stream[T]: A Stream corresponding to the given parameters 363 """ 364 return Stream( 365 values_function=values_function, 366 pipeline=require_non_none_else_get(pipeline, _root_pipeline), 367 parallel_config=require_non_none_else_get(parallel_config, JOptional.empty), 368 ordered_execution=ordered_execution, 369 create_key=cls.__CREATE_KEY 370 ) 371 372 def __init__( # pylint: disable=R0913 373 self, 374 values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]], 375 pipeline: StepPipeline[V, T], 376 parallel_config: JOptional[ParallelConfiguration], 377 ordered_execution: bool, 378 create_key: object, 379 ): 380 """Stream private constructor: Constructs a Stream[T] inspired by java `java.util.stream.Stream<T>`""" 381 assert create_key == self.__CREATE_KEY, "Stream private constructor! Please use Stream.of" 382 self._values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]] = \ 383 require_non_none(values_function) 384 self._pipeline: StepPipeline[V, T] = require_non_none(pipeline) 385 self._parallel_config: JOptional[ParallelConfiguration] = require_non_none(parallel_config) 386 self._ordered_execution: bool = require_non_none(ordered_execution) 387 388 def map(self, mapper: Function[T, R]) -> Stream[R]: 389 """ 390 Returns a stream consisting of the results of applying the given function to the elements of this stream 391 392 Args: 393 mapper (Function[T, R]): The function to apply to each element 394 395 Returns: 396 Stream[R]: A stream consisting of the results of applying the given function to the elements of this stream 397 398 Raises: 399 TypeError: if mapper is None 400 """ 401 require_non_none(mapper) 402 return Stream._of( 403 values_function=self._values_function, 404 # pylint: disable=E1101 405 pipeline=self._pipeline.add_handler(cast(Function[T, StepResult[R]], partial(_map_lambda, mapper=mapper))), 406 parallel_config=self._parallel_config, 407 ordered_execution=self._ordered_execution 408 ) 409 410 def parallel(self, parallel_config: Optional[ParallelConfiguration]) -> Self: 411 """ 412 Configures the Stream to use the given parallel configuration and return the current stream 413 414 Note: None parallel configuration is like calling `sequential()` method 415 416 Args: 417 parallel_config: The parallel configuration to use 418 419 Returns: 420 Stream[T]: The current Stream after set parallel configuration 421 """ 422 self._parallel_config = JOptional.of_noneable(parallel_config) 423 return self 424 425 def is_parallel(self) -> bool: 426 """ 427 Returns whether this stream, if a terminal operation were to be executed, would execute in parallel 428 429 Returns: 430 bool: True if the Stream uses parallel configuration, otherwise False 431 """ 432 return self._parallel_config.is_present() 433 434 def sequential(self) -> Self: 435 """ 436 Configures the Stream to use sequential configuration and return the current stream 437 (i.e.: Remove the parallel configuration if exists) 438 439 Returns: 440 Stream[T]: The current Stream after set sequential configuration 441 """ 442 if self.is_parallel(): 443 self._parallel_config = JOptional.empty() 444 return self 445 446 def filter(self, predicate: Predicate[T]) -> Stream[T]: 447 """ 448 Returns a stream consisting of the elements of this stream that match the given predicate 449 450 Args: 451 predicate: The predicate to apply to each element to determine if it should be included 452 453 Returns: 454 Stream[T]: A stream consisting of the elements of this stream that match the given predicate 455 456 Raises: 457 TypeError: if predicate is None 458 """ 459 return Stream._of( 460 values_function=self._values_function, 461 # pylint: disable=E1101 462 pipeline=self._pipeline.add_handler( 463 cast(Function[T, StepResult[T]], partial(_filter_lambda, predicate=require_non_none(predicate))) 464 ), 465 parallel_config=self._parallel_config, 466 ordered_execution=self._ordered_execution 467 ) 468 469 def collect(self, collector: Collector[T, R]) -> R: 470 """ 471 Performs a reduction operation on the elements of this stream using a Collector 472 473 Args: 474 collector: The Collector describing the reduction 475 476 Returns: 477 R: The result of the reduction 478 479 Raises: 480 TypeError: if collector is None 481 """ 482 return self._execute(collector=require_non_none(collector)) 483 484 def collect_from(self, supplier: Supplier[R], accumulator: BiConsumer[R, T], combiner: BiConsumer[R, R]) -> R: 485 """ 486 Performs a reduction operation on the elements of this stream 487 488 Args: 489 supplier: A function that creates a new result container. For a parallel execution, this function may be 490 called multiple times and must return a fresh value each time 491 accumulator: A function that must fold an element into a result container 492 combiner: A unction that accepts two partial result containers and merges them, which must be compatible 493 with the accumulator function. The combiner function must fold the elements from the second result 494 container into the first result container 495 496 Returns: 497 R: The result of the reduction 498 499 Raises: 500 TypeError: if at least one parameter is None 501 """ 502 return self.collect(collector=collectors.of_biconsumers(supplier, accumulator, combiner)) 503 504 def to_list(self) -> list[T]: 505 """ 506 Accumulates the elements of this stream into a list 507 508 Returns: 509 list[T]: A list containing the stream elements 510 """ 511 return self.collect(collector=collectors.to_list()) 512 513 def to_tuple(self) -> tuple[T, ...]: 514 """ 515 Accumulates the elements of this stream into a tuple 516 517 Returns: 518 tuple[T, ...]: a tuple containing the stream elements 519 """ 520 return self.collect(collector=collectors.to_tuple()) 521 522 def to_dict(self, key_mapper: Function[T, K], value_mapper: Function[T, V]) -> dict[K, V]: 523 """ 524 Accumulates the elements of this stream into a tuple dict whose keys and values are the result of applying the 525 provided mapping functions to the input elements 526 527 Args: 528 key_mapper: a mapping function to produce keys 529 value_mapper: a mapping function to produce values 530 531 Returns: 532 dict[K, V]: a dict containing the stream elements whose keys and values are the result of applying mapping 533 functions to the input elements 534 """ 535 return self.collect(collector=collectors.to_dict(key_mapper, value_mapper)) 536 537 def all_match(self, predicate: Predicate[T]) -> bool: 538 """ 539 Returns whether all elements of this stream match the provided predicate 540 541 Args: 542 predicate: The predicate to apply to elements of this stream 543 544 Returns: 545 bool: True if either all elements of the stream match the provided predicate or the stream is empty, 546 otherwise False 547 548 Raises: 549 TypeError: if predicate is None 550 """ 551 return self.map(predicate).reduce(True, _and_lambda) 552 553 def any_match(self, predicate: Predicate[T]) -> bool: 554 """ 555 Returns whether any elements of this stream match the provided predicate 556 557 Args: 558 predicate: The predicate to apply to elements of this stream 559 560 Returns: 561 bool: True if any elements of the stream match the provided predicate, otherwise False 562 563 Raises: 564 TypeError: if predicate is None 565 """ 566 return self.filter(predicate).find_any().is_present() 567 568 def count(self) -> int: 569 """ 570 Returns the count of elements in this stream 571 572 Returns: 573 int: The count of elements in this stream 574 """ 575 return self.collect(collector=collectors.to_counter()) 576 577 def for_each(self, consumer: Consumer[T]) -> None: 578 """ 579 Performs an action (/consumer) for each element of this stream 580 581 Args: 582 consumer: The Consumer to perform on the elements 583 584 Returns: 585 None: nothing 586 587 Raises: 588 TypeError: if consumer is None 589 """ 590 self.peek(require_non_none(consumer)) 591 self._execute() 592 593 def flat_map(self, mapper: Function[T, Stream[R]]) -> Stream[R]: 594 """ 595 Returns a stream consisting of the results of replacing each element of this stream with the contents of a 596 mapped stream produced by applying the provided mapping function to each element 597 598 Args: 599 mapper: A function to apply to each element which produces a stream of new values 600 601 Returns: 602 Stream[R]: The new Stream 603 604 Raises: 605 TypeError: if mapper is None 606 """ 607 return self._from_self_config( 608 values_function=cast( 609 BiFunction[Optional[ParallelConfiguration], bool, Iterable[R]], 610 partial(self._flat_map_values_function, mapper=require_non_none(mapper)) 611 ) 612 ) 613 614 def ordered_execution(self, ordered: bool = True) -> Stream[T]: 615 """ 616 Configures the current Stream to be ordered or not and returns it 617 618 Note: This configuration has no impact in sequential stream 619 620 Args: 621 ordered: True if the Stream must be ordered, False for unordered. (Default: True) 622 623 Returns: 624 Stream[T]: The current Stream with ordered configuration 625 626 Raises: 627 TypeError: if ordered is None 628 """ 629 return Stream._of( 630 values_function=self._values_function, 631 pipeline=self._pipeline, 632 parallel_config=self._parallel_config, 633 ordered_execution=ordered 634 ) if self._ordered_execution != require_non_none(ordered) else self 635 636 def unordered(self) -> Stream[T]: 637 """ 638 Configures the current Stream to be unordered and returns it 639 640 Note: Is equivalent to call `ordered_execution(False)` 641 Note 2: This configuration has no impact in sequential stream 642 643 Returns: 644 Stream[T]: The current Stream with unordered configuration 645 """ 646 return self.ordered_execution(False) 647 648 def find_first(self) -> JOptional[T]: 649 """ 650 Returns a JOptional describing the first element of this stream, or an empty JOptional if the stream is empty 651 652 Returns: 653 JOptional[T]: A JOptional describing the first element of this stream, or an empty JOptional if the stream 654 is empty 655 """ 656 return self.ordered_execution(True).find_any() 657 658 def find_any(self) -> JOptional[T]: 659 """ 660 Returns a JOptional describing some element of the stream, or an empty JOptional if the stream is empty 661 662 Note: if the stream is ordered, find_any and find_first are equivalents 663 664 Returns: 665 JOptional[T]: A JOptional describing some element of the stream, or an empty JOptional if the stream is 666 empty 667 """ 668 results: list[T] = self._execute( 669 stop_on_first_completed=True # pragma: no mutate 670 , collector=collectors.to_list() 671 ) 672 return JOptional.of_noneable(results[0] if results else None) 673 674 def distinct(self) -> Stream[T]: 675 """ 676 Returns a stream consisting of the distinct elements of this stream 677 678 Returns: 679 Stream[T]: A stream consisting of the distinct elements of this stream 680 """ 681 return self._from_self_config(values_function=self._distinct_values_function) 682 683 def drop_while(self, predicate: Predicate[T]) -> Stream[T]: 684 """ 685 Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping 686 the longest prefix of elements that match the given predicate. Otherwise, if this stream is unordered, returns a 687 stream consisting of the remaining elements of this stream after dropping a subset of elements that match the 688 given predicate. 689 690 Args: 691 predicate: The predicate to apply to elements to determine the longest prefix of elements 692 693 Returns: 694 Stream[T]: The new Stream 695 696 Raises: 697 TypeError: if predicate is None 698 """ 699 return self._from_self_config( 700 values_function=partial(self._drop_while_values_function, predicate=require_non_none(predicate)) 701 ) 702 703 def limit(self, limit: int) -> Stream[T]: 704 """ 705 Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length 706 707 Args: 708 limit: The number of elements the stream should be limited to 709 710 Returns: 711 Stream[T]: The truncated Stream 712 713 Raises: 714 TypeError: if limit is None 715 """ 716 return self._from_self_config( 717 values_function=partial(self._limit_values_function, limit=require_non_none(limit))) 718 719 def max(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]: 720 """ 721 Returns the maximum element of this stream according to the provided comparator BiFunction 722 723 Args: 724 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 725 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 726 (default: natural comparator) 727 728 Returns: 729 JOptional[T]: A JOptional describing the maximum element of this stream, or an empty JOptional if the stream 730 is empty 731 732 Raises: 733 TypeError: if comparator is None 734 """ 735 return self.min(partial(_to_max_comparator, comparator=require_non_none(comparator))) 736 737 def min(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]: 738 """ 739 Returns the minimum element of this stream according to the provided comparator BiFunction 740 741 Args: 742 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 743 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 744 (default: natural comparator) 745 746 Returns: 747 JOptional[T]: A JOptional describing the minimum element of this stream, or an empty JOptional if the stream 748 is empty 749 750 Raises: 751 TypeError: if comparator is None 752 """ 753 return self.sorted(require_non_none(comparator)).find_first() 754 755 def peek(self, consumer: Consumer[T]) -> Stream[T]: 756 """ 757 Returns a stream consisting of the elements of this stream, additionally performing the provided action 758 (/consumer) on each element as elements are consumed from the resulting stream 759 760 Args: 761 consumer: The Consumer to perform on the elements as they are consumed from the stream 762 763 Returns: 764 Stream[T]: The new Stream 765 766 Raises: 767 TypeError: if consumer is None 768 """ 769 return self.map(cast(Function[T, T], partial(_peek_mapper, consumer=require_non_none(consumer)))) 770 771 def reduce(self, identity: T, accumulator: BiFunction[T, T, T]) -> T: 772 """ 773 Performs a reduction on the elements of this stream, using the provided identity value and an associative 774 accumulation function, and returns the reduced value 775 776 Args: 777 identity: The identity value for the accumulating function 778 accumulator: The BiFunction for combining two values 779 780 Returns: 781 T: The result of the reduction 782 783 Raises: 784 TypeError: accumulator is None 785 """ 786 return self.collect(collectors.of(supplier=partial(to_self, obj=identity), 787 accumulator=accumulator, 788 combiner=accumulator 789 ) 790 ) 791 792 def skip(self, n: int) -> Stream[T]: 793 """ 794 Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of 795 the stream 796 797 Note: If this stream contains fewer than n elements then an empty stream will be returned 798 799 Args: 800 n: The number of leading elements to skip 801 802 Returns: 803 Stream[T]: The new stream 804 805 Raises: 806 TypeError: if n is None 807 808 """ 809 return self._from_self_config(values_function=partial(self._skip_values_function, n=require_non_none(n))) 810 811 def take_while(self, predicate: Predicate[T]) -> Stream[T]: 812 """ 813 Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream 814 that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of a subset 815 of elements taken from this stream that match the given predicate 816 817 Args: 818 predicate: The predicate to apply to elements to determine the longest prefix of element 819 820 Returns: 821 Stream[T]: The new Stream 822 823 Raises: 824 TypeError: if predicate is None 825 """ 826 return self._from_self_config( 827 values_function=partial(self._take_while_values_function, predicate=require_non_none(predicate)) 828 ) 829 830 def sorted(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> Stream[T]: 831 """ 832 Returns a stream consisting of the elements of this stream, sorted according to the provided comparator 833 834 Args: 835 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 836 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 837 (default: natural comparator) 838 839 Returns: 840 Stream[T]: The new Stream 841 842 Raises: 843 TypeError: if comparator is None 844 """ 845 self._ordered_execution = True 846 return self._from_self_config( 847 values_function=partial(self._sorted_values_function, comparator=require_non_none(comparator)) 848 ) 849 850 def to_generator(self) -> Iterator[T]: 851 """ 852 Returns a generator based on Stream inputs and mapping 853 854 Note: If the Stream is sequential each value is processed when __next__() is called. Otherwise, if the Stream 855 uses a parallel configuration, all values are processed at the first __next__() call but each value is returned 856 one by one depending on the 'ordered/unordered' configuration 857 858 Note 2: When stream uses parallel with unordered configuration each value is returned when it is available. With 859 an ordered configuration the generator wait for each necessary value in order to respect the stream value order 860 861 Returns: 862 Iterator[T]: The Stream processed values generator 863 """ 864 # pylint: disable=E1102 865 values: Final[Iterable[Any]] = \ 866 self._values_function(self._parallel_config.or_else(None), self._ordered_execution) 867 if not values: 868 return 869 870 if self.is_parallel(): 871 yield from _parallel_generator( 872 values=values, 873 pipeline=self._pipeline, 874 parallel_config=self._parallel_config.get(), 875 ordered_execution=self._ordered_execution 876 ) 877 else: 878 yield from _sync_generator(values=values, pipeline=self._pipeline) 879 880 def _execute( 881 self, 882 stop_on_first_completed: bool = False, 883 collector: Collector[T, R] = _none_collector() 884 ) -> R: 885 """ 886 private method to execute the current Stream 887 """ 888 require_non_none(stop_on_first_completed) 889 require_non_none(collector) 890 # pylint: disable=E1102 891 values: Final[Optional[Iterable[Any]]] = \ 892 self._values_function(self._parallel_config.or_else(None), self._ordered_execution) 893 894 if not values: 895 return collector.supplier() 896 897 if self.is_parallel(): 898 return _parallel_execution( 899 values=values, 900 pipeline=self._pipeline, 901 parallel_config=self._parallel_config.get(), 902 ordered_execution=self._ordered_execution, 903 stop_on_first_completed=stop_on_first_completed, 904 collector=collector 905 ) 906 907 return _sync_execution( 908 values=values, pipeline=self._pipeline, stop_on_first_completed=stop_on_first_completed, collector=collector 909 ) 910 911 def _from_self_config( 912 self, values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]] 913 ) -> Stream[R]: 914 """ 915 private method that returns a new Stream by copying the current configuration 916 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 917 """ 918 return Stream._of( 919 values_function=values_function, 920 parallel_config=self._parallel_config, 921 ordered_execution=self._ordered_execution 922 ) 923 924 def _init_to_values_function( 925 self, parallel_config: Optional[ParallelConfiguration], ordered_execution: bool 926 ) -> Stream[T]: 927 """ 928 private method that initializes the current stream with the given configuration 929 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 930 """ 931 return self.ordered_execution(ordered=ordered_execution).parallel(parallel_config) 932 933 def _sorted_values_function( 934 self, parallel_config: ParallelConfiguration, ordered_execution: bool, comparator: BiFunction[T, T, int] 935 ) -> Iterable[T]: 936 """ 937 private method used by `sorted` public method in replacement of lambda 938 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 939 """ 940 values: list[T] = self._init_to_values_function(parallel_config, ordered_execution).to_list() 941 values.sort(key=cmp_to_key(comparator)) 942 return values 943 944 def _flat_map_values_function( 945 self, parallel_config: Optional[ParallelConfiguration], 946 ordered_execution: bool, 947 mapper: Function[T, Stream[R]] 948 ) -> Iterable[R]: 949 """ 950 private method used by `flat_map` public method in replacement of lambda 951 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 952 """ 953 list_generator: Final[Iterable[list[R]]] = ( 954 self 955 ._init_to_values_function(parallel_config, ordered_execution) 956 .map(mapper) 957 .map(_stream_to_list_lambda) # type: ignore 958 .to_generator() 959 ) 960 for values in list_generator: 961 yield from values 962 963 def _distinct_values_function( 964 self, parallel_config: Optional[ParallelConfiguration], ordered_execution: bool 965 ) -> Iterable[T]: 966 """ 967 private method used by `distinct` public method in replacement of lambda 968 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 969 """ 970 existing_values: set[T] = set() 971 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 972 if not value in existing_values: 973 existing_values.add(value) 974 yield value 975 976 def _drop_while_values_function( 977 self, parallel_config: ParallelConfiguration, ordered_execution: bool, predicate: Predicate[T] 978 ) -> Iterable[T]: 979 """ 980 private method used by `drop_while` public method in replacement of lambda 981 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 982 """ 983 drop: bool = True 984 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 985 drop = drop and predicate(value) 986 if drop is False: 987 yield value 988 989 def _limit_values_function( 990 self, parallel_config: ParallelConfiguration, ordered_execution: bool, limit: int 991 ) -> Iterable[T]: 992 """ 993 private method used by `limit` public method in replacement of lambda 994 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 995 """ 996 if limit <= 0: 997 return 998 999 counter: int = 0 1000 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 1001 yield value 1002 counter += 1 1003 if counter >= limit: 1004 break 1005 1006 def _skip_values_function( 1007 self, parallel_config: ParallelConfiguration, ordered_execution: bool, n: int 1008 ) -> Iterable[T]: 1009 """ 1010 private method used by `skip` public method in replacement of lambda 1011 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 1012 """ 1013 counter: int = 0 1014 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 1015 if counter >= n: 1016 yield value 1017 counter += 1 1018 1019 def _take_while_values_function( 1020 self, parallel_config: ParallelConfiguration, ordered_execution: bool, predicate: Predicate[T] 1021 ) -> Iterable[T]: 1022 """ 1023 private method used by `take_while` public method in replacement of lambda 1024 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 1025 """ 1026 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 1027 if not predicate(value): 1028 break # pragma: no mutate 1029 yield value
279@dataclass(frozen=True) 280class ParallelConfiguration: 281 """ 282 ParallelConfiguration class that describes Stream parallel configuration when using parallel execution 283 284 Args: 285 executor (Executor): The executor to use for parallel (/concurrent) execution 286 chunksize (int): If greater than one, the stream values will be chopped into chunks of size chunksize and 287 submitted to the process pool. If set to one, the items in the stream will be sent one at a time. 288 289 Raises: 290 TypeError: if executor is None 291 ValueError: if chunksize is less than 1 292 """ 293 executor: Executor 294 chunksize: int = 1 295 296 def __post_init__(self): 297 require_non_none(self.executor, "executor must be non None") 298 if require_non_none(self.chunksize, "chunksize must be non None") < 1: 299 raise ValueError("chunksize must be greater than or equals to 1")
ParallelConfiguration class that describes Stream parallel configuration when using parallel execution
Arguments:
- executor (Executor): The executor to use for parallel (/concurrent) execution
- chunksize (int): If greater than one, the stream values will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the stream will be sent one at a time.
Raises:
- TypeError: if executor is None
- ValueError: if chunksize is less than 1
302class Stream(Generic[T]): # pylint: disable=R0904 303 __CREATE_KEY: Final[object] = object() 304 305 @classmethod 306 def empty(cls) -> Stream[T]: 307 """ 308 Returns an empty sequential Stream 309 310 Returns: 311 Stream[T]: An empty sequential Stream 312 """ 313 return cls.of() 314 315 @classmethod 316 def of(cls, *values: T) -> Stream[T]: 317 """ 318 Returns a sequential ordered stream whose elements are the specified values 319 320 Args: 321 *values: ordered stream elements 322 323 Returns: 324 Stream[T]: A sequential ordered stream whose elements are the specified values 325 """ 326 return cls.of_iterable(values) 327 328 @classmethod 329 def of_iterable(cls, iterable: Iterable[T]) -> Stream[T]: 330 """ 331 Returns a sequential ordered stream whose elements are values from the given Iterable 332 333 Args: 334 iterable: iterable of stream source values 335 336 Returns: 337 Stream[T]: A sequential ordered stream whose elements are values from the given Iterable 338 339 Raises: 340 TypeError: if iterable is None 341 """ 342 require_non_none(iterable) 343 return cls._of(lambda p, o: iterable) 344 345 @classmethod 346 def _of( 347 cls, 348 values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]], 349 pipeline: Optional[StepPipeline[V, R]] = None, 350 parallel_config: Optional[JOptional[ParallelConfiguration]] = None, 351 ordered_execution: bool = False 352 ) -> Stream[R]: 353 """ 354 Private class method in order to simplify the call to Stream private constructor 355 356 Args: 357 values_function: The BiFunction which provides the Stream values 358 pipeline: The StepPipeline to be executed if presents by the Stream values on terminal operation 359 parallel_config: The Stream parallel configuration 360 ordered_execution: TRUE if the value must be returned to the encounter order 361 362 Returns: 363 Stream[T]: A Stream corresponding to the given parameters 364 """ 365 return Stream( 366 values_function=values_function, 367 pipeline=require_non_none_else_get(pipeline, _root_pipeline), 368 parallel_config=require_non_none_else_get(parallel_config, JOptional.empty), 369 ordered_execution=ordered_execution, 370 create_key=cls.__CREATE_KEY 371 ) 372 373 def __init__( # pylint: disable=R0913 374 self, 375 values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]], 376 pipeline: StepPipeline[V, T], 377 parallel_config: JOptional[ParallelConfiguration], 378 ordered_execution: bool, 379 create_key: object, 380 ): 381 """Stream private constructor: Constructs a Stream[T] inspired by java `java.util.stream.Stream<T>`""" 382 assert create_key == self.__CREATE_KEY, "Stream private constructor! Please use Stream.of" 383 self._values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]] = \ 384 require_non_none(values_function) 385 self._pipeline: StepPipeline[V, T] = require_non_none(pipeline) 386 self._parallel_config: JOptional[ParallelConfiguration] = require_non_none(parallel_config) 387 self._ordered_execution: bool = require_non_none(ordered_execution) 388 389 def map(self, mapper: Function[T, R]) -> Stream[R]: 390 """ 391 Returns a stream consisting of the results of applying the given function to the elements of this stream 392 393 Args: 394 mapper (Function[T, R]): The function to apply to each element 395 396 Returns: 397 Stream[R]: A stream consisting of the results of applying the given function to the elements of this stream 398 399 Raises: 400 TypeError: if mapper is None 401 """ 402 require_non_none(mapper) 403 return Stream._of( 404 values_function=self._values_function, 405 # pylint: disable=E1101 406 pipeline=self._pipeline.add_handler(cast(Function[T, StepResult[R]], partial(_map_lambda, mapper=mapper))), 407 parallel_config=self._parallel_config, 408 ordered_execution=self._ordered_execution 409 ) 410 411 def parallel(self, parallel_config: Optional[ParallelConfiguration]) -> Self: 412 """ 413 Configures the Stream to use the given parallel configuration and return the current stream 414 415 Note: None parallel configuration is like calling `sequential()` method 416 417 Args: 418 parallel_config: The parallel configuration to use 419 420 Returns: 421 Stream[T]: The current Stream after set parallel configuration 422 """ 423 self._parallel_config = JOptional.of_noneable(parallel_config) 424 return self 425 426 def is_parallel(self) -> bool: 427 """ 428 Returns whether this stream, if a terminal operation were to be executed, would execute in parallel 429 430 Returns: 431 bool: True if the Stream uses parallel configuration, otherwise False 432 """ 433 return self._parallel_config.is_present() 434 435 def sequential(self) -> Self: 436 """ 437 Configures the Stream to use sequential configuration and return the current stream 438 (i.e.: Remove the parallel configuration if exists) 439 440 Returns: 441 Stream[T]: The current Stream after set sequential configuration 442 """ 443 if self.is_parallel(): 444 self._parallel_config = JOptional.empty() 445 return self 446 447 def filter(self, predicate: Predicate[T]) -> Stream[T]: 448 """ 449 Returns a stream consisting of the elements of this stream that match the given predicate 450 451 Args: 452 predicate: The predicate to apply to each element to determine if it should be included 453 454 Returns: 455 Stream[T]: A stream consisting of the elements of this stream that match the given predicate 456 457 Raises: 458 TypeError: if predicate is None 459 """ 460 return Stream._of( 461 values_function=self._values_function, 462 # pylint: disable=E1101 463 pipeline=self._pipeline.add_handler( 464 cast(Function[T, StepResult[T]], partial(_filter_lambda, predicate=require_non_none(predicate))) 465 ), 466 parallel_config=self._parallel_config, 467 ordered_execution=self._ordered_execution 468 ) 469 470 def collect(self, collector: Collector[T, R]) -> R: 471 """ 472 Performs a reduction operation on the elements of this stream using a Collector 473 474 Args: 475 collector: The Collector describing the reduction 476 477 Returns: 478 R: The result of the reduction 479 480 Raises: 481 TypeError: if collector is None 482 """ 483 return self._execute(collector=require_non_none(collector)) 484 485 def collect_from(self, supplier: Supplier[R], accumulator: BiConsumer[R, T], combiner: BiConsumer[R, R]) -> R: 486 """ 487 Performs a reduction operation on the elements of this stream 488 489 Args: 490 supplier: A function that creates a new result container. For a parallel execution, this function may be 491 called multiple times and must return a fresh value each time 492 accumulator: A function that must fold an element into a result container 493 combiner: A unction that accepts two partial result containers and merges them, which must be compatible 494 with the accumulator function. The combiner function must fold the elements from the second result 495 container into the first result container 496 497 Returns: 498 R: The result of the reduction 499 500 Raises: 501 TypeError: if at least one parameter is None 502 """ 503 return self.collect(collector=collectors.of_biconsumers(supplier, accumulator, combiner)) 504 505 def to_list(self) -> list[T]: 506 """ 507 Accumulates the elements of this stream into a list 508 509 Returns: 510 list[T]: A list containing the stream elements 511 """ 512 return self.collect(collector=collectors.to_list()) 513 514 def to_tuple(self) -> tuple[T, ...]: 515 """ 516 Accumulates the elements of this stream into a tuple 517 518 Returns: 519 tuple[T, ...]: a tuple containing the stream elements 520 """ 521 return self.collect(collector=collectors.to_tuple()) 522 523 def to_dict(self, key_mapper: Function[T, K], value_mapper: Function[T, V]) -> dict[K, V]: 524 """ 525 Accumulates the elements of this stream into a tuple dict whose keys and values are the result of applying the 526 provided mapping functions to the input elements 527 528 Args: 529 key_mapper: a mapping function to produce keys 530 value_mapper: a mapping function to produce values 531 532 Returns: 533 dict[K, V]: a dict containing the stream elements whose keys and values are the result of applying mapping 534 functions to the input elements 535 """ 536 return self.collect(collector=collectors.to_dict(key_mapper, value_mapper)) 537 538 def all_match(self, predicate: Predicate[T]) -> bool: 539 """ 540 Returns whether all elements of this stream match the provided predicate 541 542 Args: 543 predicate: The predicate to apply to elements of this stream 544 545 Returns: 546 bool: True if either all elements of the stream match the provided predicate or the stream is empty, 547 otherwise False 548 549 Raises: 550 TypeError: if predicate is None 551 """ 552 return self.map(predicate).reduce(True, _and_lambda) 553 554 def any_match(self, predicate: Predicate[T]) -> bool: 555 """ 556 Returns whether any elements of this stream match the provided predicate 557 558 Args: 559 predicate: The predicate to apply to elements of this stream 560 561 Returns: 562 bool: True if any elements of the stream match the provided predicate, otherwise False 563 564 Raises: 565 TypeError: if predicate is None 566 """ 567 return self.filter(predicate).find_any().is_present() 568 569 def count(self) -> int: 570 """ 571 Returns the count of elements in this stream 572 573 Returns: 574 int: The count of elements in this stream 575 """ 576 return self.collect(collector=collectors.to_counter()) 577 578 def for_each(self, consumer: Consumer[T]) -> None: 579 """ 580 Performs an action (/consumer) for each element of this stream 581 582 Args: 583 consumer: The Consumer to perform on the elements 584 585 Returns: 586 None: nothing 587 588 Raises: 589 TypeError: if consumer is None 590 """ 591 self.peek(require_non_none(consumer)) 592 self._execute() 593 594 def flat_map(self, mapper: Function[T, Stream[R]]) -> Stream[R]: 595 """ 596 Returns a stream consisting of the results of replacing each element of this stream with the contents of a 597 mapped stream produced by applying the provided mapping function to each element 598 599 Args: 600 mapper: A function to apply to each element which produces a stream of new values 601 602 Returns: 603 Stream[R]: The new Stream 604 605 Raises: 606 TypeError: if mapper is None 607 """ 608 return self._from_self_config( 609 values_function=cast( 610 BiFunction[Optional[ParallelConfiguration], bool, Iterable[R]], 611 partial(self._flat_map_values_function, mapper=require_non_none(mapper)) 612 ) 613 ) 614 615 def ordered_execution(self, ordered: bool = True) -> Stream[T]: 616 """ 617 Configures the current Stream to be ordered or not and returns it 618 619 Note: This configuration has no impact in sequential stream 620 621 Args: 622 ordered: True if the Stream must be ordered, False for unordered. (Default: True) 623 624 Returns: 625 Stream[T]: The current Stream with ordered configuration 626 627 Raises: 628 TypeError: if ordered is None 629 """ 630 return Stream._of( 631 values_function=self._values_function, 632 pipeline=self._pipeline, 633 parallel_config=self._parallel_config, 634 ordered_execution=ordered 635 ) if self._ordered_execution != require_non_none(ordered) else self 636 637 def unordered(self) -> Stream[T]: 638 """ 639 Configures the current Stream to be unordered and returns it 640 641 Note: Is equivalent to call `ordered_execution(False)` 642 Note 2: This configuration has no impact in sequential stream 643 644 Returns: 645 Stream[T]: The current Stream with unordered configuration 646 """ 647 return self.ordered_execution(False) 648 649 def find_first(self) -> JOptional[T]: 650 """ 651 Returns a JOptional describing the first element of this stream, or an empty JOptional if the stream is empty 652 653 Returns: 654 JOptional[T]: A JOptional describing the first element of this stream, or an empty JOptional if the stream 655 is empty 656 """ 657 return self.ordered_execution(True).find_any() 658 659 def find_any(self) -> JOptional[T]: 660 """ 661 Returns a JOptional describing some element of the stream, or an empty JOptional if the stream is empty 662 663 Note: if the stream is ordered, find_any and find_first are equivalents 664 665 Returns: 666 JOptional[T]: A JOptional describing some element of the stream, or an empty JOptional if the stream is 667 empty 668 """ 669 results: list[T] = self._execute( 670 stop_on_first_completed=True # pragma: no mutate 671 , collector=collectors.to_list() 672 ) 673 return JOptional.of_noneable(results[0] if results else None) 674 675 def distinct(self) -> Stream[T]: 676 """ 677 Returns a stream consisting of the distinct elements of this stream 678 679 Returns: 680 Stream[T]: A stream consisting of the distinct elements of this stream 681 """ 682 return self._from_self_config(values_function=self._distinct_values_function) 683 684 def drop_while(self, predicate: Predicate[T]) -> Stream[T]: 685 """ 686 Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping 687 the longest prefix of elements that match the given predicate. Otherwise, if this stream is unordered, returns a 688 stream consisting of the remaining elements of this stream after dropping a subset of elements that match the 689 given predicate. 690 691 Args: 692 predicate: The predicate to apply to elements to determine the longest prefix of elements 693 694 Returns: 695 Stream[T]: The new Stream 696 697 Raises: 698 TypeError: if predicate is None 699 """ 700 return self._from_self_config( 701 values_function=partial(self._drop_while_values_function, predicate=require_non_none(predicate)) 702 ) 703 704 def limit(self, limit: int) -> Stream[T]: 705 """ 706 Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length 707 708 Args: 709 limit: The number of elements the stream should be limited to 710 711 Returns: 712 Stream[T]: The truncated Stream 713 714 Raises: 715 TypeError: if limit is None 716 """ 717 return self._from_self_config( 718 values_function=partial(self._limit_values_function, limit=require_non_none(limit))) 719 720 def max(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]: 721 """ 722 Returns the maximum element of this stream according to the provided comparator BiFunction 723 724 Args: 725 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 726 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 727 (default: natural comparator) 728 729 Returns: 730 JOptional[T]: A JOptional describing the maximum element of this stream, or an empty JOptional if the stream 731 is empty 732 733 Raises: 734 TypeError: if comparator is None 735 """ 736 return self.min(partial(_to_max_comparator, comparator=require_non_none(comparator))) 737 738 def min(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]: 739 """ 740 Returns the minimum element of this stream according to the provided comparator BiFunction 741 742 Args: 743 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 744 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 745 (default: natural comparator) 746 747 Returns: 748 JOptional[T]: A JOptional describing the minimum element of this stream, or an empty JOptional if the stream 749 is empty 750 751 Raises: 752 TypeError: if comparator is None 753 """ 754 return self.sorted(require_non_none(comparator)).find_first() 755 756 def peek(self, consumer: Consumer[T]) -> Stream[T]: 757 """ 758 Returns a stream consisting of the elements of this stream, additionally performing the provided action 759 (/consumer) on each element as elements are consumed from the resulting stream 760 761 Args: 762 consumer: The Consumer to perform on the elements as they are consumed from the stream 763 764 Returns: 765 Stream[T]: The new Stream 766 767 Raises: 768 TypeError: if consumer is None 769 """ 770 return self.map(cast(Function[T, T], partial(_peek_mapper, consumer=require_non_none(consumer)))) 771 772 def reduce(self, identity: T, accumulator: BiFunction[T, T, T]) -> T: 773 """ 774 Performs a reduction on the elements of this stream, using the provided identity value and an associative 775 accumulation function, and returns the reduced value 776 777 Args: 778 identity: The identity value for the accumulating function 779 accumulator: The BiFunction for combining two values 780 781 Returns: 782 T: The result of the reduction 783 784 Raises: 785 TypeError: accumulator is None 786 """ 787 return self.collect(collectors.of(supplier=partial(to_self, obj=identity), 788 accumulator=accumulator, 789 combiner=accumulator 790 ) 791 ) 792 793 def skip(self, n: int) -> Stream[T]: 794 """ 795 Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of 796 the stream 797 798 Note: If this stream contains fewer than n elements then an empty stream will be returned 799 800 Args: 801 n: The number of leading elements to skip 802 803 Returns: 804 Stream[T]: The new stream 805 806 Raises: 807 TypeError: if n is None 808 809 """ 810 return self._from_self_config(values_function=partial(self._skip_values_function, n=require_non_none(n))) 811 812 def take_while(self, predicate: Predicate[T]) -> Stream[T]: 813 """ 814 Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream 815 that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of a subset 816 of elements taken from this stream that match the given predicate 817 818 Args: 819 predicate: The predicate to apply to elements to determine the longest prefix of element 820 821 Returns: 822 Stream[T]: The new Stream 823 824 Raises: 825 TypeError: if predicate is None 826 """ 827 return self._from_self_config( 828 values_function=partial(self._take_while_values_function, predicate=require_non_none(predicate)) 829 ) 830 831 def sorted(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> Stream[T]: 832 """ 833 Returns a stream consisting of the elements of this stream, sorted according to the provided comparator 834 835 Args: 836 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 837 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 838 (default: natural comparator) 839 840 Returns: 841 Stream[T]: The new Stream 842 843 Raises: 844 TypeError: if comparator is None 845 """ 846 self._ordered_execution = True 847 return self._from_self_config( 848 values_function=partial(self._sorted_values_function, comparator=require_non_none(comparator)) 849 ) 850 851 def to_generator(self) -> Iterator[T]: 852 """ 853 Returns a generator based on Stream inputs and mapping 854 855 Note: If the Stream is sequential each value is processed when __next__() is called. Otherwise, if the Stream 856 uses a parallel configuration, all values are processed at the first __next__() call but each value is returned 857 one by one depending on the 'ordered/unordered' configuration 858 859 Note 2: When stream uses parallel with unordered configuration each value is returned when it is available. With 860 an ordered configuration the generator wait for each necessary value in order to respect the stream value order 861 862 Returns: 863 Iterator[T]: The Stream processed values generator 864 """ 865 # pylint: disable=E1102 866 values: Final[Iterable[Any]] = \ 867 self._values_function(self._parallel_config.or_else(None), self._ordered_execution) 868 if not values: 869 return 870 871 if self.is_parallel(): 872 yield from _parallel_generator( 873 values=values, 874 pipeline=self._pipeline, 875 parallel_config=self._parallel_config.get(), 876 ordered_execution=self._ordered_execution 877 ) 878 else: 879 yield from _sync_generator(values=values, pipeline=self._pipeline) 880 881 def _execute( 882 self, 883 stop_on_first_completed: bool = False, 884 collector: Collector[T, R] = _none_collector() 885 ) -> R: 886 """ 887 private method to execute the current Stream 888 """ 889 require_non_none(stop_on_first_completed) 890 require_non_none(collector) 891 # pylint: disable=E1102 892 values: Final[Optional[Iterable[Any]]] = \ 893 self._values_function(self._parallel_config.or_else(None), self._ordered_execution) 894 895 if not values: 896 return collector.supplier() 897 898 if self.is_parallel(): 899 return _parallel_execution( 900 values=values, 901 pipeline=self._pipeline, 902 parallel_config=self._parallel_config.get(), 903 ordered_execution=self._ordered_execution, 904 stop_on_first_completed=stop_on_first_completed, 905 collector=collector 906 ) 907 908 return _sync_execution( 909 values=values, pipeline=self._pipeline, stop_on_first_completed=stop_on_first_completed, collector=collector 910 ) 911 912 def _from_self_config( 913 self, values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]] 914 ) -> Stream[R]: 915 """ 916 private method that returns a new Stream by copying the current configuration 917 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 918 """ 919 return Stream._of( 920 values_function=values_function, 921 parallel_config=self._parallel_config, 922 ordered_execution=self._ordered_execution 923 ) 924 925 def _init_to_values_function( 926 self, parallel_config: Optional[ParallelConfiguration], ordered_execution: bool 927 ) -> Stream[T]: 928 """ 929 private method that initializes the current stream with the given configuration 930 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 931 """ 932 return self.ordered_execution(ordered=ordered_execution).parallel(parallel_config) 933 934 def _sorted_values_function( 935 self, parallel_config: ParallelConfiguration, ordered_execution: bool, comparator: BiFunction[T, T, int] 936 ) -> Iterable[T]: 937 """ 938 private method used by `sorted` public method in replacement of lambda 939 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 940 """ 941 values: list[T] = self._init_to_values_function(parallel_config, ordered_execution).to_list() 942 values.sort(key=cmp_to_key(comparator)) 943 return values 944 945 def _flat_map_values_function( 946 self, parallel_config: Optional[ParallelConfiguration], 947 ordered_execution: bool, 948 mapper: Function[T, Stream[R]] 949 ) -> Iterable[R]: 950 """ 951 private method used by `flat_map` public method in replacement of lambda 952 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 953 """ 954 list_generator: Final[Iterable[list[R]]] = ( 955 self 956 ._init_to_values_function(parallel_config, ordered_execution) 957 .map(mapper) 958 .map(_stream_to_list_lambda) # type: ignore 959 .to_generator() 960 ) 961 for values in list_generator: 962 yield from values 963 964 def _distinct_values_function( 965 self, parallel_config: Optional[ParallelConfiguration], ordered_execution: bool 966 ) -> Iterable[T]: 967 """ 968 private method used by `distinct` public method in replacement of lambda 969 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 970 """ 971 existing_values: set[T] = set() 972 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 973 if not value in existing_values: 974 existing_values.add(value) 975 yield value 976 977 def _drop_while_values_function( 978 self, parallel_config: ParallelConfiguration, ordered_execution: bool, predicate: Predicate[T] 979 ) -> Iterable[T]: 980 """ 981 private method used by `drop_while` public method in replacement of lambda 982 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 983 """ 984 drop: bool = True 985 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 986 drop = drop and predicate(value) 987 if drop is False: 988 yield value 989 990 def _limit_values_function( 991 self, parallel_config: ParallelConfiguration, ordered_execution: bool, limit: int 992 ) -> Iterable[T]: 993 """ 994 private method used by `limit` public method in replacement of lambda 995 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 996 """ 997 if limit <= 0: 998 return 999 1000 counter: int = 0 1001 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 1002 yield value 1003 counter += 1 1004 if counter >= limit: 1005 break 1006 1007 def _skip_values_function( 1008 self, parallel_config: ParallelConfiguration, ordered_execution: bool, n: int 1009 ) -> Iterable[T]: 1010 """ 1011 private method used by `skip` public method in replacement of lambda 1012 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 1013 """ 1014 counter: int = 0 1015 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 1016 if counter >= n: 1017 yield value 1018 counter += 1 1019 1020 def _take_while_values_function( 1021 self, parallel_config: ParallelConfiguration, ordered_execution: bool, predicate: Predicate[T] 1022 ) -> Iterable[T]: 1023 """ 1024 private method used by `take_while` public method in replacement of lambda 1025 Note: lambda are not used in order to be compatible with multiprocessing (lambda are not serializable) 1026 """ 1027 for value in self._init_to_values_function(parallel_config, ordered_execution).to_generator(): 1028 if not predicate(value): 1029 break # pragma: no mutate 1030 yield value
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
373 def __init__( # pylint: disable=R0913 374 self, 375 values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]], 376 pipeline: StepPipeline[V, T], 377 parallel_config: JOptional[ParallelConfiguration], 378 ordered_execution: bool, 379 create_key: object, 380 ): 381 """Stream private constructor: Constructs a Stream[T] inspired by java `java.util.stream.Stream<T>`""" 382 assert create_key == self.__CREATE_KEY, "Stream private constructor! Please use Stream.of" 383 self._values_function: BiFunction[Optional[ParallelConfiguration], bool, Iterable[V]] = \ 384 require_non_none(values_function) 385 self._pipeline: StepPipeline[V, T] = require_non_none(pipeline) 386 self._parallel_config: JOptional[ParallelConfiguration] = require_non_none(parallel_config) 387 self._ordered_execution: bool = require_non_none(ordered_execution)
Stream private constructor: Constructs a Stream[T] inspired by java java.util.stream.Stream<T>
305 @classmethod 306 def empty(cls) -> Stream[T]: 307 """ 308 Returns an empty sequential Stream 309 310 Returns: 311 Stream[T]: An empty sequential Stream 312 """ 313 return cls.of()
Returns an empty sequential Stream
Returns:
Stream[T]: An empty sequential Stream
315 @classmethod 316 def of(cls, *values: T) -> Stream[T]: 317 """ 318 Returns a sequential ordered stream whose elements are the specified values 319 320 Args: 321 *values: ordered stream elements 322 323 Returns: 324 Stream[T]: A sequential ordered stream whose elements are the specified values 325 """ 326 return cls.of_iterable(values)
Returns a sequential ordered stream whose elements are the specified values
Arguments:
- *values: ordered stream elements
Returns:
Stream[T]: A sequential ordered stream whose elements are the specified values
328 @classmethod 329 def of_iterable(cls, iterable: Iterable[T]) -> Stream[T]: 330 """ 331 Returns a sequential ordered stream whose elements are values from the given Iterable 332 333 Args: 334 iterable: iterable of stream source values 335 336 Returns: 337 Stream[T]: A sequential ordered stream whose elements are values from the given Iterable 338 339 Raises: 340 TypeError: if iterable is None 341 """ 342 require_non_none(iterable) 343 return cls._of(lambda p, o: iterable)
Returns a sequential ordered stream whose elements are values from the given Iterable
Arguments:
- iterable: iterable of stream source values
Returns:
Stream[T]: A sequential ordered stream whose elements are values from the given Iterable
Raises:
- TypeError: if iterable is None
389 def map(self, mapper: Function[T, R]) -> Stream[R]: 390 """ 391 Returns a stream consisting of the results of applying the given function to the elements of this stream 392 393 Args: 394 mapper (Function[T, R]): The function to apply to each element 395 396 Returns: 397 Stream[R]: A stream consisting of the results of applying the given function to the elements of this stream 398 399 Raises: 400 TypeError: if mapper is None 401 """ 402 require_non_none(mapper) 403 return Stream._of( 404 values_function=self._values_function, 405 # pylint: disable=E1101 406 pipeline=self._pipeline.add_handler(cast(Function[T, StepResult[R]], partial(_map_lambda, mapper=mapper))), 407 parallel_config=self._parallel_config, 408 ordered_execution=self._ordered_execution 409 )
Returns a stream consisting of the results of applying the given function to the elements of this stream
Arguments:
- mapper (Function[T, R]): The function to apply to each element
Returns:
Stream[R]: A stream consisting of the results of applying the given function to the elements of this stream
Raises:
- TypeError: if mapper is None
411 def parallel(self, parallel_config: Optional[ParallelConfiguration]) -> Self: 412 """ 413 Configures the Stream to use the given parallel configuration and return the current stream 414 415 Note: None parallel configuration is like calling `sequential()` method 416 417 Args: 418 parallel_config: The parallel configuration to use 419 420 Returns: 421 Stream[T]: The current Stream after set parallel configuration 422 """ 423 self._parallel_config = JOptional.of_noneable(parallel_config) 424 return self
Configures the Stream to use the given parallel configuration and return the current stream
Note: None parallel configuration is like calling sequential()
method
Arguments:
- parallel_config: The parallel configuration to use
Returns:
Stream[T]: The current Stream after set parallel configuration
426 def is_parallel(self) -> bool: 427 """ 428 Returns whether this stream, if a terminal operation were to be executed, would execute in parallel 429 430 Returns: 431 bool: True if the Stream uses parallel configuration, otherwise False 432 """ 433 return self._parallel_config.is_present()
Returns whether this stream, if a terminal operation were to be executed, would execute in parallel
Returns:
bool: True if the Stream uses parallel configuration, otherwise False
435 def sequential(self) -> Self: 436 """ 437 Configures the Stream to use sequential configuration and return the current stream 438 (i.e.: Remove the parallel configuration if exists) 439 440 Returns: 441 Stream[T]: The current Stream after set sequential configuration 442 """ 443 if self.is_parallel(): 444 self._parallel_config = JOptional.empty() 445 return self
Configures the Stream to use sequential configuration and return the current stream (i.e.: Remove the parallel configuration if exists)
Returns:
Stream[T]: The current Stream after set sequential configuration
447 def filter(self, predicate: Predicate[T]) -> Stream[T]: 448 """ 449 Returns a stream consisting of the elements of this stream that match the given predicate 450 451 Args: 452 predicate: The predicate to apply to each element to determine if it should be included 453 454 Returns: 455 Stream[T]: A stream consisting of the elements of this stream that match the given predicate 456 457 Raises: 458 TypeError: if predicate is None 459 """ 460 return Stream._of( 461 values_function=self._values_function, 462 # pylint: disable=E1101 463 pipeline=self._pipeline.add_handler( 464 cast(Function[T, StepResult[T]], partial(_filter_lambda, predicate=require_non_none(predicate))) 465 ), 466 parallel_config=self._parallel_config, 467 ordered_execution=self._ordered_execution 468 )
Returns a stream consisting of the elements of this stream that match the given predicate
Arguments:
- predicate: The predicate to apply to each element to determine if it should be included
Returns:
Stream[T]: A stream consisting of the elements of this stream that match the given predicate
Raises:
- TypeError: if predicate is None
470 def collect(self, collector: Collector[T, R]) -> R: 471 """ 472 Performs a reduction operation on the elements of this stream using a Collector 473 474 Args: 475 collector: The Collector describing the reduction 476 477 Returns: 478 R: The result of the reduction 479 480 Raises: 481 TypeError: if collector is None 482 """ 483 return self._execute(collector=require_non_none(collector))
Performs a reduction operation on the elements of this stream using a Collector
Arguments:
- collector: The Collector describing the reduction
Returns:
R: The result of the reduction
Raises:
- TypeError: if collector is None
485 def collect_from(self, supplier: Supplier[R], accumulator: BiConsumer[R, T], combiner: BiConsumer[R, R]) -> R: 486 """ 487 Performs a reduction operation on the elements of this stream 488 489 Args: 490 supplier: A function that creates a new result container. For a parallel execution, this function may be 491 called multiple times and must return a fresh value each time 492 accumulator: A function that must fold an element into a result container 493 combiner: A unction that accepts two partial result containers and merges them, which must be compatible 494 with the accumulator function. The combiner function must fold the elements from the second result 495 container into the first result container 496 497 Returns: 498 R: The result of the reduction 499 500 Raises: 501 TypeError: if at least one parameter is None 502 """ 503 return self.collect(collector=collectors.of_biconsumers(supplier, accumulator, combiner))
Performs a reduction operation on the elements of this stream
Arguments:
- supplier: A function that creates a new result container. For a parallel execution, this function may be called multiple times and must return a fresh value each time
- accumulator: A function that must fold an element into a result container
- combiner: A unction that accepts two partial result containers and merges them, which must be compatible with the accumulator function. The combiner function must fold the elements from the second result container into the first result container
Returns:
R: The result of the reduction
Raises:
- TypeError: if at least one parameter is None
505 def to_list(self) -> list[T]: 506 """ 507 Accumulates the elements of this stream into a list 508 509 Returns: 510 list[T]: A list containing the stream elements 511 """ 512 return self.collect(collector=collectors.to_list())
Accumulates the elements of this stream into a list
Returns:
list[T]: A list containing the stream elements
514 def to_tuple(self) -> tuple[T, ...]: 515 """ 516 Accumulates the elements of this stream into a tuple 517 518 Returns: 519 tuple[T, ...]: a tuple containing the stream elements 520 """ 521 return self.collect(collector=collectors.to_tuple())
Accumulates the elements of this stream into a tuple
Returns:
tuple[T, ...]: a tuple containing the stream elements
523 def to_dict(self, key_mapper: Function[T, K], value_mapper: Function[T, V]) -> dict[K, V]: 524 """ 525 Accumulates the elements of this stream into a tuple dict whose keys and values are the result of applying the 526 provided mapping functions to the input elements 527 528 Args: 529 key_mapper: a mapping function to produce keys 530 value_mapper: a mapping function to produce values 531 532 Returns: 533 dict[K, V]: a dict containing the stream elements whose keys and values are the result of applying mapping 534 functions to the input elements 535 """ 536 return self.collect(collector=collectors.to_dict(key_mapper, value_mapper))
Accumulates the elements of this stream into a tuple dict whose keys and values are the result of applying the provided mapping functions to the input elements
Arguments:
- key_mapper: a mapping function to produce keys
- value_mapper: a mapping function to produce values
Returns:
dict[K, V]: a dict containing the stream elements whose keys and values are the result of applying mapping functions to the input elements
538 def all_match(self, predicate: Predicate[T]) -> bool: 539 """ 540 Returns whether all elements of this stream match the provided predicate 541 542 Args: 543 predicate: The predicate to apply to elements of this stream 544 545 Returns: 546 bool: True if either all elements of the stream match the provided predicate or the stream is empty, 547 otherwise False 548 549 Raises: 550 TypeError: if predicate is None 551 """ 552 return self.map(predicate).reduce(True, _and_lambda)
Returns whether all elements of this stream match the provided predicate
Arguments:
- predicate: The predicate to apply to elements of this stream
Returns:
bool: True if either all elements of the stream match the provided predicate or the stream is empty, otherwise False
Raises:
- TypeError: if predicate is None
554 def any_match(self, predicate: Predicate[T]) -> bool: 555 """ 556 Returns whether any elements of this stream match the provided predicate 557 558 Args: 559 predicate: The predicate to apply to elements of this stream 560 561 Returns: 562 bool: True if any elements of the stream match the provided predicate, otherwise False 563 564 Raises: 565 TypeError: if predicate is None 566 """ 567 return self.filter(predicate).find_any().is_present()
Returns whether any elements of this stream match the provided predicate
Arguments:
- predicate: The predicate to apply to elements of this stream
Returns:
bool: True if any elements of the stream match the provided predicate, otherwise False
Raises:
- TypeError: if predicate is None
569 def count(self) -> int: 570 """ 571 Returns the count of elements in this stream 572 573 Returns: 574 int: The count of elements in this stream 575 """ 576 return self.collect(collector=collectors.to_counter())
Returns the count of elements in this stream
Returns:
int: The count of elements in this stream
578 def for_each(self, consumer: Consumer[T]) -> None: 579 """ 580 Performs an action (/consumer) for each element of this stream 581 582 Args: 583 consumer: The Consumer to perform on the elements 584 585 Returns: 586 None: nothing 587 588 Raises: 589 TypeError: if consumer is None 590 """ 591 self.peek(require_non_none(consumer)) 592 self._execute()
Performs an action (/consumer) for each element of this stream
Arguments:
- consumer: The Consumer to perform on the elements
Returns:
None: nothing
Raises:
- TypeError: if consumer is None
594 def flat_map(self, mapper: Function[T, Stream[R]]) -> Stream[R]: 595 """ 596 Returns a stream consisting of the results of replacing each element of this stream with the contents of a 597 mapped stream produced by applying the provided mapping function to each element 598 599 Args: 600 mapper: A function to apply to each element which produces a stream of new values 601 602 Returns: 603 Stream[R]: The new Stream 604 605 Raises: 606 TypeError: if mapper is None 607 """ 608 return self._from_self_config( 609 values_function=cast( 610 BiFunction[Optional[ParallelConfiguration], bool, Iterable[R]], 611 partial(self._flat_map_values_function, mapper=require_non_none(mapper)) 612 ) 613 )
Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element
Arguments:
- mapper: A function to apply to each element which produces a stream of new values
Returns:
Stream[R]: The new Stream
Raises:
- TypeError: if mapper is None
615 def ordered_execution(self, ordered: bool = True) -> Stream[T]: 616 """ 617 Configures the current Stream to be ordered or not and returns it 618 619 Note: This configuration has no impact in sequential stream 620 621 Args: 622 ordered: True if the Stream must be ordered, False for unordered. (Default: True) 623 624 Returns: 625 Stream[T]: The current Stream with ordered configuration 626 627 Raises: 628 TypeError: if ordered is None 629 """ 630 return Stream._of( 631 values_function=self._values_function, 632 pipeline=self._pipeline, 633 parallel_config=self._parallel_config, 634 ordered_execution=ordered 635 ) if self._ordered_execution != require_non_none(ordered) else self
Configures the current Stream to be ordered or not and returns it
Note: This configuration has no impact in sequential stream
Arguments:
- ordered: True if the Stream must be ordered, False for unordered. (Default: True)
Returns:
Stream[T]: The current Stream with ordered configuration
Raises:
- TypeError: if ordered is None
637 def unordered(self) -> Stream[T]: 638 """ 639 Configures the current Stream to be unordered and returns it 640 641 Note: Is equivalent to call `ordered_execution(False)` 642 Note 2: This configuration has no impact in sequential stream 643 644 Returns: 645 Stream[T]: The current Stream with unordered configuration 646 """ 647 return self.ordered_execution(False)
Configures the current Stream to be unordered and returns it
Note: Is equivalent to call ordered_execution(False)
Note 2: This configuration has no impact in sequential stream
Returns:
Stream[T]: The current Stream with unordered configuration
649 def find_first(self) -> JOptional[T]: 650 """ 651 Returns a JOptional describing the first element of this stream, or an empty JOptional if the stream is empty 652 653 Returns: 654 JOptional[T]: A JOptional describing the first element of this stream, or an empty JOptional if the stream 655 is empty 656 """ 657 return self.ordered_execution(True).find_any()
Returns a JOptional describing the first element of this stream, or an empty JOptional if the stream is empty
Returns:
JOptional[T]: A JOptional describing the first element of this stream, or an empty JOptional if the stream is empty
659 def find_any(self) -> JOptional[T]: 660 """ 661 Returns a JOptional describing some element of the stream, or an empty JOptional if the stream is empty 662 663 Note: if the stream is ordered, find_any and find_first are equivalents 664 665 Returns: 666 JOptional[T]: A JOptional describing some element of the stream, or an empty JOptional if the stream is 667 empty 668 """ 669 results: list[T] = self._execute( 670 stop_on_first_completed=True # pragma: no mutate 671 , collector=collectors.to_list() 672 ) 673 return JOptional.of_noneable(results[0] if results else None)
Returns a JOptional describing some element of the stream, or an empty JOptional if the stream is empty
Note: if the stream is ordered, find_any and find_first are equivalents
Returns:
JOptional[T]: A JOptional describing some element of the stream, or an empty JOptional if the stream is empty
675 def distinct(self) -> Stream[T]: 676 """ 677 Returns a stream consisting of the distinct elements of this stream 678 679 Returns: 680 Stream[T]: A stream consisting of the distinct elements of this stream 681 """ 682 return self._from_self_config(values_function=self._distinct_values_function)
Returns a stream consisting of the distinct elements of this stream
Returns:
Stream[T]: A stream consisting of the distinct elements of this stream
684 def drop_while(self, predicate: Predicate[T]) -> Stream[T]: 685 """ 686 Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping 687 the longest prefix of elements that match the given predicate. Otherwise, if this stream is unordered, returns a 688 stream consisting of the remaining elements of this stream after dropping a subset of elements that match the 689 given predicate. 690 691 Args: 692 predicate: The predicate to apply to elements to determine the longest prefix of elements 693 694 Returns: 695 Stream[T]: The new Stream 696 697 Raises: 698 TypeError: if predicate is None 699 """ 700 return self._from_self_config( 701 values_function=partial(self._drop_while_values_function, predicate=require_non_none(predicate)) 702 )
Returns, if this stream is ordered, a stream consisting of the remaining elements of this stream after dropping the longest prefix of elements that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of the remaining elements of this stream after dropping a subset of elements that match the given predicate.
Arguments:
- predicate: The predicate to apply to elements to determine the longest prefix of elements
Returns:
Stream[T]: The new Stream
Raises:
- TypeError: if predicate is None
704 def limit(self, limit: int) -> Stream[T]: 705 """ 706 Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length 707 708 Args: 709 limit: The number of elements the stream should be limited to 710 711 Returns: 712 Stream[T]: The truncated Stream 713 714 Raises: 715 TypeError: if limit is None 716 """ 717 return self._from_self_config( 718 values_function=partial(self._limit_values_function, limit=require_non_none(limit)))
Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length
Arguments:
- limit: The number of elements the stream should be limited to
Returns:
Stream[T]: The truncated Stream
Raises:
- TypeError: if limit is None
720 def max(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]: 721 """ 722 Returns the maximum element of this stream according to the provided comparator BiFunction 723 724 Args: 725 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 726 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 727 (default: natural comparator) 728 729 Returns: 730 JOptional[T]: A JOptional describing the maximum element of this stream, or an empty JOptional if the stream 731 is empty 732 733 Raises: 734 TypeError: if comparator is None 735 """ 736 return self.min(partial(_to_max_comparator, comparator=require_non_none(comparator)))
Returns the maximum element of this stream according to the provided comparator BiFunction
Arguments:
- comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, zero, or a positive integer as the first argument is less than, equal to, or greater than the second) (default: natural comparator)
Returns:
JOptional[T]: A JOptional describing the maximum element of this stream, or an empty JOptional if the stream is empty
Raises:
- TypeError: if comparator is None
738 def min(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> JOptional[T]: 739 """ 740 Returns the minimum element of this stream according to the provided comparator BiFunction 741 742 Args: 743 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 744 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 745 (default: natural comparator) 746 747 Returns: 748 JOptional[T]: A JOptional describing the minimum element of this stream, or an empty JOptional if the stream 749 is empty 750 751 Raises: 752 TypeError: if comparator is None 753 """ 754 return self.sorted(require_non_none(comparator)).find_first()
Returns the minimum element of this stream according to the provided comparator BiFunction
Arguments:
- comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, zero, or a positive integer as the first argument is less than, equal to, or greater than the second) (default: natural comparator)
Returns:
JOptional[T]: A JOptional describing the minimum element of this stream, or an empty JOptional if the stream is empty
Raises:
- TypeError: if comparator is None
756 def peek(self, consumer: Consumer[T]) -> Stream[T]: 757 """ 758 Returns a stream consisting of the elements of this stream, additionally performing the provided action 759 (/consumer) on each element as elements are consumed from the resulting stream 760 761 Args: 762 consumer: The Consumer to perform on the elements as they are consumed from the stream 763 764 Returns: 765 Stream[T]: The new Stream 766 767 Raises: 768 TypeError: if consumer is None 769 """ 770 return self.map(cast(Function[T, T], partial(_peek_mapper, consumer=require_non_none(consumer))))
Returns a stream consisting of the elements of this stream, additionally performing the provided action (/consumer) on each element as elements are consumed from the resulting stream
Arguments:
- consumer: The Consumer to perform on the elements as they are consumed from the stream
Returns:
Stream[T]: The new Stream
Raises:
- TypeError: if consumer is None
772 def reduce(self, identity: T, accumulator: BiFunction[T, T, T]) -> T: 773 """ 774 Performs a reduction on the elements of this stream, using the provided identity value and an associative 775 accumulation function, and returns the reduced value 776 777 Args: 778 identity: The identity value for the accumulating function 779 accumulator: The BiFunction for combining two values 780 781 Returns: 782 T: The result of the reduction 783 784 Raises: 785 TypeError: accumulator is None 786 """ 787 return self.collect(collectors.of(supplier=partial(to_self, obj=identity), 788 accumulator=accumulator, 789 combiner=accumulator 790 ) 791 )
Performs a reduction on the elements of this stream, using the provided identity value and an associative accumulation function, and returns the reduced value
Arguments:
- identity: The identity value for the accumulating function
- accumulator: The BiFunction for combining two values
Returns:
T: The result of the reduction
Raises:
- TypeError: accumulator is None
793 def skip(self, n: int) -> Stream[T]: 794 """ 795 Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of 796 the stream 797 798 Note: If this stream contains fewer than n elements then an empty stream will be returned 799 800 Args: 801 n: The number of leading elements to skip 802 803 Returns: 804 Stream[T]: The new stream 805 806 Raises: 807 TypeError: if n is None 808 809 """ 810 return self._from_self_config(values_function=partial(self._skip_values_function, n=require_non_none(n)))
Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream
Note: If this stream contains fewer than n elements then an empty stream will be returned
Arguments:
- n: The number of leading elements to skip
Returns:
Stream[T]: The new stream
Raises:
- TypeError: if n is None
812 def take_while(self, predicate: Predicate[T]) -> Stream[T]: 813 """ 814 Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream 815 that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of a subset 816 of elements taken from this stream that match the given predicate 817 818 Args: 819 predicate: The predicate to apply to elements to determine the longest prefix of element 820 821 Returns: 822 Stream[T]: The new Stream 823 824 Raises: 825 TypeError: if predicate is None 826 """ 827 return self._from_self_config( 828 values_function=partial(self._take_while_values_function, predicate=require_non_none(predicate)) 829 )
Returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream that match the given predicate. Otherwise, if this stream is unordered, returns a stream consisting of a subset of elements taken from this stream that match the given predicate
Arguments:
- predicate: The predicate to apply to elements to determine the longest prefix of element
Returns:
Stream[T]: The new Stream
Raises:
- TypeError: if predicate is None
831 def sorted(self, comparator: BiFunction[T, T, int] = _natural_comparator) -> Stream[T]: 832 """ 833 Returns a stream consisting of the elements of this stream, sorted according to the provided comparator 834 835 Args: 836 comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, 837 zero, or a positive integer as the first argument is less than, equal to, or greater than the second) 838 (default: natural comparator) 839 840 Returns: 841 Stream[T]: The new Stream 842 843 Raises: 844 TypeError: if comparator is None 845 """ 846 self._ordered_execution = True 847 return self._from_self_config( 848 values_function=partial(self._sorted_values_function, comparator=require_non_none(comparator)) 849 )
Returns a stream consisting of the elements of this stream, sorted according to the provided comparator
Arguments:
- comparator: The comparator BiFunction to compare elements of this stream (Note: Returns a negative int, zero, or a positive integer as the first argument is less than, equal to, or greater than the second) (default: natural comparator)
Returns:
Stream[T]: The new Stream
Raises:
- TypeError: if comparator is None
851 def to_generator(self) -> Iterator[T]: 852 """ 853 Returns a generator based on Stream inputs and mapping 854 855 Note: If the Stream is sequential each value is processed when __next__() is called. Otherwise, if the Stream 856 uses a parallel configuration, all values are processed at the first __next__() call but each value is returned 857 one by one depending on the 'ordered/unordered' configuration 858 859 Note 2: When stream uses parallel with unordered configuration each value is returned when it is available. With 860 an ordered configuration the generator wait for each necessary value in order to respect the stream value order 861 862 Returns: 863 Iterator[T]: The Stream processed values generator 864 """ 865 # pylint: disable=E1102 866 values: Final[Iterable[Any]] = \ 867 self._values_function(self._parallel_config.or_else(None), self._ordered_execution) 868 if not values: 869 return 870 871 if self.is_parallel(): 872 yield from _parallel_generator( 873 values=values, 874 pipeline=self._pipeline, 875 parallel_config=self._parallel_config.get(), 876 ordered_execution=self._ordered_execution 877 ) 878 else: 879 yield from _sync_generator(values=values, pipeline=self._pipeline)
Returns a generator based on Stream inputs and mapping
Note: If the Stream is sequential each value is processed when __next__() is called. Otherwise, if the Stream uses a parallel configuration, all values are processed at the first __next__() call but each value is returned one by one depending on the 'ordered/unordered' configuration
Note 2: When stream uses parallel with unordered configuration each value is returned when it is available. With an ordered configuration the generator wait for each necessary value in order to respect the stream value order
Returns:
Iterator[T]: The Stream processed values generator