Source code for noob.node.gather

from multiprocessing import Lock
from multiprocessing.synchronize import Lock as LockType
from typing import Any, Generic, TypeVar

from pydantic import PrivateAttr

from noob.event import MetaSignal
from noob.node.base import Node

_TInput = TypeVar("_TInput")


[docs] class Gather(Node, Generic[_TInput]): """ Cardinality reduction. Given a node that emits >1 events, gather them into a single iterable. Two (mutually exclusive) modes: - gather a fixed number of events .. code-block:: yaml nodename: type: gather params: n: 5 depends: - value: othernode.signal - gather events until a trigger is received .. code-block:: yaml nodename: type: gather depends: - value: othernode.signal_1 - trigger: thirdnode.signal_2 """ n: int | None = None _items: list[_TInput] = PrivateAttr(default_factory=list) _lock: LockType = PrivateAttr(default_factory=Lock)
[docs] def process(self, value: _TInput, trigger: Any | None = None) -> list[_TInput] | MetaSignal: """Collect value in a list, emit if `n` is met or `trigger` is present""" if trigger is not None and self.n is not None: raise ValueError("Cannot use trigger mode while `n` is set") with self._lock: self._items.append(value) if self._should_return(trigger): try: return self._items finally: # clear list after returning self._items = [] return MetaSignal.NoEvent
def _should_return(self, trigger: Any | None) -> bool: return (self.n is not None and len(self._items) >= self.n) or ( self.n is None and trigger is not None )