Source code for noob.node.tube

import warnings
from typing import TYPE_CHECKING, Any, Union

from noob.exceptions import ExtraInputWarning
from noob.node.base import Node, Slot
from noob.types import ConfigSource, RunnerContext

if TYPE_CHECKING:
    from noob.runner import TubeRunner
    from noob.tube import Tube, TubeSpecification


[docs] class TubeNode(Node): """ A node that contains another tube within it """ tube: ConfigSource _tube: Union["Tube", None] = None _tube_spec: Union["TubeSpecification", None] = None _runner: Union["TubeRunner", None] = None @property def tube_spec(self) -> "TubeSpecification": from noob.tube import TubeSpecification if self._tube_spec is None: self._tube_spec = TubeSpecification.from_any(self.tube) return self._tube_spec # TODO: support dependency injected inits in plugin
[docs] def init(self, context: RunnerContext) -> None: # type: ignore[override] from noob import SynchronousRunner, Tube with warnings.catch_warnings(action="ignore", category=ExtraInputWarning): self._tube = Tube.from_specification( self.tube, input={**context["tube"].input_collection.chain} ) self._runner = SynchronousRunner(tube=self._tube) self._runner.init()
[docs] def deinit(self) -> None: if self._runner is not None: self._runner.deinit()
[docs] def process(self, **kwargs: Any) -> Any: if self._runner is None: raise RuntimeError( "TubeNode must be initialized within a Runner " "to receive the outer runner's context. " "It doesn't make sense to run a TubeNode on its own." ) return self._runner.process(**kwargs)
def _collect_slots(self) -> dict[str, Slot]: from noob.input import InputScope slots = {} for in_key, in_val in self.tube_spec.input.items(): if in_val.scope == InputScope.process: slots[in_key] = Slot(name=in_key, annotation=Any) return slots