store¶
Tube runners for running tubes
- EventDict¶
A nested dictionary to store events for rapid access (vs. the old implementation which was just a big list to filter).
Stores by epoch, node_id, and signal, like events = {‘epoch’: {‘node_id’: {‘signal’: […]}}}
Should be made with a defaultdict to avoid annoying nested indexing problems
alias of
dict[Annotated[int,Ge(ge=0)],dict[Annotated[str,AfterValidator(func=_is_identifier),AfterValidator(func=_not_reserved)],dict[Annotated[str,AfterValidator(func=_is_identifier)],list[Event]]]]
- class EventStore(events: dict[~typing.Annotated[int, ~annotated_types.Ge(ge=0)], dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier), ~pydantic.functional_validators.AfterValidator(func=~noob.types._not_reserved)], dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier)], list[~noob.event.Event]]]] = <factory>, counter: ~itertools.count = <factory>, _event_condition: ~threading.Condition = <factory>)[source]¶
Container class for storing and retrieving events by node and slot
- events: dict[Annotated[int, Ge(ge=0)], dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], dict[Annotated[str, AfterValidator(func=_is_identifier)], list[Event]]]]¶
- counter: count¶
- add(event: Event) Event[source]¶
Add an existing event to the store, returning it.
Mostly an abstraction layer to give ourselves room above the events list in cast we want to change the internal implementation of how events are stored
- add_value(signals: list[Signal], value: Any, node_id: str, epoch: int) list[Event][source]¶
Add the result of a
Node.process()call to the event store.Split the dictionary of values into separate
Events, store along with current timestamp- Parameters:
signals (
list[Signal]) – Signals from which the value was emitted by aNode.process()callvalue (
Any) – Value emitted by aNode.process()call. Gets wrapped with a list in case the length of signals is 1. Otherwise, it’s zipped with :signals:node_id (
str) – ID of the node that emitted the eventsepoch (
int) – Epoch count that the signal was emitted in
- get(node_id: str, signal: str, epoch: int) Event[source]¶
Get the event with the matching node_id and signal name from a given epoch.
If epoch is -1, return the most recent event.
- Raises:
KeyError – if no event with the matching node_id and signal name exists
- collect(edges: list[Edge], epoch: int) dict | None[source]¶
Gather events into a form that can be consumed by a
Node.process()method, given the collection of inbound edges (usually fromTube.in_edges()).If none of the requested events have been emitted, return
None.If all of the requested events have been emitted, return a kwarg-like dict
If some of the requested events are missing but others are present, exclude the keys from the returned dict (None is a valid value for an event, so if a key is present and the value is None, the event was emitted with a value of None)
If epoch is -1, get the the events from the most recent epoch where all events are present, and if no epochs are present with a full set of events, return None
Todo
Add an example
- collect_events(edges: list[Edge], epoch: int) list[Event] | None[source]¶
Collect the event objects from a set of dependencies indicated by edges in a given epoch.
If none of the requested events are present, return None
If some of the requested events but others are present, return an incomplete list.
- Parameters:
edges (
list[Edge]) – List of edges from which to collect eventsepoch (
int) – Epoch to select from, if -1, get the latest complete set of events.
- clear(epoch: int | None = None) None[source]¶
Clear events for a specific or all epochs.
Does not reset the counter (to continue giving unique ids to the next round’s events)