store

Tube runners for running tubes

EventDict

A nested dictionary to store events for rapid access (vs. the old implementation which was just a big list to filter).

Stores by epoch, node_id, and signal, like events = {‘epoch’: {‘node_id’: {‘signal’: […]}}}

Should be made with a defaultdict to avoid annoying nested indexing problems

alias of dict[Annotated[int, Ge(ge=0)], dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], dict[Annotated[str, AfterValidator(func=_is_identifier)], list[Event]]]]

class EventStore(events: dict[~typing.Annotated[int, ~annotated_types.Ge(ge=0)], dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier), ~pydantic.functional_validators.AfterValidator(func=~noob.types._not_reserved)], dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier)], list[~noob.event.Event]]]] = <factory>, counter: ~itertools.count = <factory>, _event_condition: ~threading.Condition = <factory>)[source]

Container class for storing and retrieving events by node and slot

events: dict[Annotated[int, Ge(ge=0)], dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], dict[Annotated[str, AfterValidator(func=_is_identifier)], list[Event]]]]
counter: count
property flat_events: list[Event]

Flattened list of events in the store

add(event: Event) Event[source]

Add an existing event to the store, returning it.

Mostly an abstraction layer to give ourselves room above the events list in cast we want to change the internal implementation of how events are stored

add_value(signals: list[Signal], value: Any, node_id: str, epoch: int) list[Event][source]

Add the result of a Node.process() call to the event store.

Split the dictionary of values into separate Event s, store along with current timestamp

Parameters:
  • signals (list[Signal]) – Signals from which the value was emitted by a Node.process() call

  • value (Any) – Value emitted by a Node.process() call. Gets wrapped with a list in case the length of signals is 1. Otherwise, it’s zipped with :signals:

  • node_id (str) – ID of the node that emitted the events

  • epoch (int) – Epoch count that the signal was emitted in

get(node_id: str, signal: str, epoch: int) Event[source]

Get the event with the matching node_id and signal name from a given epoch.

If epoch is -1, return the most recent event.

Raises:

KeyError – if no event with the matching node_id and signal name exists

collect(edges: list[Edge], epoch: int) dict | None[source]

Gather events into a form that can be consumed by a Node.process() method, given the collection of inbound edges (usually from Tube.in_edges() ).

If none of the requested events have been emitted, return None.

If all of the requested events have been emitted, return a kwarg-like dict

If some of the requested events are missing but others are present, exclude the keys from the returned dict (None is a valid value for an event, so if a key is present and the value is None, the event was emitted with a value of None)

If epoch is -1, get the the events from the most recent epoch where all events are present, and if no epochs are present with a full set of events, return None

Todo

Add an example

collect_events(edges: list[Edge], epoch: int) list[Event] | None[source]

Collect the event objects from a set of dependencies indicated by edges in a given epoch.

If none of the requested events are present, return None

If some of the requested events but others are present, return an incomplete list.

Parameters:
  • edges (list[Edge]) – List of edges from which to collect events

  • epoch (int) – Epoch to select from, if -1, get the latest complete set of events.

clear(epoch: int | None = None) None[source]

Clear events for a specific or all epochs.

Does not reset the counter (to continue giving unique ids to the next round’s events)

static transform_events(edges: list[Edge], events: list[Event]) dict[source]

Transform the values of a set of events to a dict that can be consumed by the target node’s process method.

i.e.: return a dictionary whose keys are the target_signal s of the edges

using the value of the matching event.

static split_args_kwargs(inputs: dict) tuple[tuple, dict][source]
iter() Generator[Event, None, None][source]

Iterate through all events