Source code for noob.runner.sync
from __future__ import annotations
from dataclasses import dataclass
from threading import Event as ThreadEvent
from typing import Any
from noob.asset import AssetScope
from noob.event import MetaEvent
from noob.node import Return
from noob.runner.base import TubeRunner
from noob.scheduler import Scheduler
from noob.types import ReturnNodeType
[docs]
@dataclass
class SynchronousRunner(TubeRunner):
"""
Simple, synchronous tube runner.
Just run the nodes in topological order and return from return nodes.
"""
def __post_init__(self):
super().__post_init__()
self._running = ThreadEvent()
[docs]
def init(self) -> None:
"""
Start processing data with the tube graph.
"""
# TODO: lock for re-entry
if self._running.is_set():
# fine!
return
self._running.set()
for node in self.tube.enabled_nodes.values():
self.inject_context(node.init)()
self.inject_context(self.tube.state.init)(AssetScope.runner)
[docs]
def deinit(self) -> None:
"""Stop all nodes processing"""
# TODO: lock to ensure we've been started
for node in self.tube.enabled_nodes.values():
self.inject_context(node.deinit)()
self.inject_context(self.tube.state.deinit)(AssetScope.runner)
self._running.clear()
@property
def running(self) -> bool:
"""Whether the tube is currently running"""
return self._running.is_set()
[docs]
def process(self, **kwargs: Any) -> ReturnNodeType:
"""
Iterate through nodes in topological order,
synchronously calling their process method and passing events as they are emitted.
Process-scoped ``input`` s can be passed as kwargs.
"""
# mostly overriding to set the docstring
return super().process(**kwargs)
def _before_process(self) -> None:
"""
Clear the eventstore and add a new epoch.
Initialize if not already done.
"""
self.store.clear()
self.tube.scheduler.add_epoch()
if not self._running.is_set():
self.init()
def _filter_ready(self, nodes: list[MetaEvent], scheduler: Scheduler) -> list[MetaEvent]:
# graph autogenerates "assets" and "inputs" nodes if something depends on it
# but in the sync runner we always have assets and inputs handy
evts = []
for node in nodes:
if node["value"] in ("assets", "input"):
scheduler.done(node["epoch"], node["value"])
else:
evts.append(node)
return evts
[docs]
def enable_node(self, node_id: str) -> None:
self.tube.nodes[node_id].init()
self.tube.enable_node(node_id)
[docs]
def disable_node(self, node_id: str) -> None:
self.tube.nodes[node_id].deinit()
self.tube.disable_node(node_id)
[docs]
def collect_return(self, epoch: int | None = None) -> ReturnNodeType:
"""The return node holds values from a single epoch, get and transform them"""
if epoch is not None:
raise ValueError("Sync runner only stores a single epoch at a time")
ret_nodes = [n for n in self.tube.enabled_nodes.values() if isinstance(n, Return)]
if not ret_nodes:
return None
ret_node = ret_nodes[0]
return ret_node.get(keep=False)