scheduler¶
- pydantic model Scheduler[source]¶
Show JSON schema
{ "title": "Scheduler", "type": "object", "properties": { "nodes": { "additionalProperties": { "$ref": "#/$defs/NodeSpecification" }, "title": "Nodes", "type": "object" }, "edges": { "items": { "$ref": "#/$defs/Edge" }, "title": "Edges", "type": "array" }, "source_nodes": { "items": { "type": "string" }, "title": "Source Nodes", "type": "array" }, "logger": { "default": null, "title": "Logger" } }, "$defs": { "Edge": { "description": "Directed connection between an output slot a node and an input slot in another node", "properties": { "source_node": { "title": "Source Node", "type": "string" }, "source_signal": { "title": "Source Signal", "type": "string" }, "target_node": { "title": "Target Node", "type": "string" }, "target_slot": { "anyOf": [ { "type": "string" }, { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Target Slot" }, "required": { "default": true, "title": "Required", "type": "boolean" } }, "required": [ "source_node", "source_signal", "target_node" ], "title": "Edge", "type": "object" }, "NodeSpecification": { "description": "Specification for a single processing node within a tube .yaml file.", "properties": { "type": { "title": "Type", "type": "string" }, "id": { "title": "Id", "type": "string" }, "depends": { "anyOf": [ { "items": { "anyOf": [ { "type": "string" }, { "additionalProperties": { "type": "string" }, "maxProperties": 1, "minProperties": 1, "type": "object" } ] }, "type": "array" }, { "type": "string" }, { "type": "null" } ], "default": null, "title": "Depends" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Params" }, "enabled": { "default": true, "title": "Enabled", "type": "boolean" }, "stateful": { "anyOf": [ { "type": "boolean" }, { "type": "null" } ], "default": null, "title": "Stateful" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "type", "id" ], "title": "NodeSpecification", "type": "object" } }, "required": [ "nodes", "edges" ] }
- Config:
arbitrary_types_allowed: bool = True
- Fields:
- Validators:
get_sources»all fields
- field nodes: dict[str, NodeSpecification] [Required]¶
- Validated by:
- field source_nodes: list[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]] [Optional]¶
- Validated by:
- classmethod from_specification(nodes: dict[str, NodeSpecification], edges: list[Edge]) Self[source]¶
Create an instance of a Scheduler from
NodeSpecificationandEdge
- add_epoch(epoch: int | None = None) int[source]¶
Add another epoch with a prepared graph to the scheduler.
- disable_node(node_id: str) None[source]¶
Disable edges attached to the node and the NodeSpecification enable switches to False
- enable_node(node_id: str) None[source]¶
Enable edges attached to the node and the NodeSpecification enable switches to True
- expire(epoch: int, node_id: str) MetaEvent | None[source]¶
Mark a node as having been completed without making its dependent nodes ready. i.e. when the node emitted
NoEvent
- generations() list[tuple[str, ...]][source]¶
Get the topological generations of the graph: tuples for each set of nodes that can be run at the same time.
Order within a generation is not guaranteed to be stable.
- get_ready(epoch: int | None = None) list[MetaEvent][source]¶
Output the set of nodes that are ready across different epochs.
- Parameters:
epoch (
int | None) – if an int, get ready events for that epoch, ifNone, get ready events for all epochs.
- validator get_sources » all fields[source]¶
Get the IDs of the nodes that do not depend on other nodes.
input nodes are special implicit source nodes. Other nodes
CAN depend on it and still be a source node.
- is_active(epoch: int | None = None) bool[source]¶
Graph remains active while it holds at least one epoch that is active.
- model_post_init(context: Any, /) None¶
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self – The BaseModel instance.
context – The context.
- node_is_ready(node: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], epoch: int | None = None) bool[source]¶
Check if a single node is ready in a single or any epoch
- Parameters:
node (
NodeID) – the node to checkepoch (
int | None) – the epoch to check, ifNone, any epoch
- sources_finished(epoch: int | None = None) bool[source]¶
Check the source nodes of the given epoch have been processed. If epoch is None, check the source nodes of the latest epoch.
- update(events: MutableSequence[Event | MetaEvent] | MutableSequence[Event]) MutableSequence[Event] | MutableSequence[Event | MetaEvent][source]¶
When a set of events are received, update the graphs within the scheduler. Currently only has
TopoSorter.done()implemented.