scheduler

pydantic model Scheduler[source]

Show JSON schema
{
   "title": "Scheduler",
   "type": "object",
   "properties": {
      "nodes": {
         "additionalProperties": {
            "$ref": "#/$defs/NodeSpecification"
         },
         "title": "Nodes",
         "type": "object"
      },
      "edges": {
         "items": {
            "$ref": "#/$defs/Edge"
         },
         "title": "Edges",
         "type": "array"
      },
      "source_nodes": {
         "items": {
            "type": "string"
         },
         "title": "Source Nodes",
         "type": "array"
      },
      "logger": {
         "default": null,
         "title": "Logger"
      }
   },
   "$defs": {
      "Edge": {
         "description": "Directed connection between an output slot a node and an input slot in another node",
         "properties": {
            "source_node": {
               "title": "Source Node",
               "type": "string"
            },
            "source_signal": {
               "title": "Source Signal",
               "type": "string"
            },
            "target_node": {
               "title": "Target Node",
               "type": "string"
            },
            "target_slot": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Target Slot"
            },
            "required": {
               "default": true,
               "title": "Required",
               "type": "boolean"
            }
         },
         "required": [
            "source_node",
            "source_signal",
            "target_node"
         ],
         "title": "Edge",
         "type": "object"
      },
      "NodeSpecification": {
         "description": "Specification for a single processing node within a tube .yaml file.",
         "properties": {
            "type": {
               "title": "Type",
               "type": "string"
            },
            "id": {
               "title": "Id",
               "type": "string"
            },
            "depends": {
               "anyOf": [
                  {
                     "items": {
                        "anyOf": [
                           {
                              "type": "string"
                           },
                           {
                              "additionalProperties": {
                                 "type": "string"
                              },
                              "maxProperties": 1,
                              "minProperties": 1,
                              "type": "object"
                           }
                        ]
                     },
                     "type": "array"
                  },
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Depends"
            },
            "params": {
               "anyOf": [
                  {
                     "additionalProperties": true,
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Params"
            },
            "enabled": {
               "default": true,
               "title": "Enabled",
               "type": "boolean"
            },
            "stateful": {
               "anyOf": [
                  {
                     "type": "boolean"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Stateful"
            },
            "description": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Description"
            }
         },
         "required": [
            "type",
            "id"
         ],
         "title": "NodeSpecification",
         "type": "object"
      }
   },
   "required": [
      "nodes",
      "edges"
   ]
}

Config:
  • arbitrary_types_allowed: bool = True

Fields:
Validators:
field edges: list[Edge] [Required]
Validated by:
field logger: Logger [Optional]
Validated by:
field nodes: dict[str, NodeSpecification] [Required]
Validated by:
field source_nodes: list[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]] [Optional]
Validated by:
classmethod from_specification(nodes: dict[str, NodeSpecification], edges: list[Edge]) Self[source]

Create an instance of a Scheduler from NodeSpecification and Edge

add_epoch(epoch: int | None = None) int[source]

Add another epoch with a prepared graph to the scheduler.

clear() None[source]

Remove epoch records, restarting the scheduler

disable_node(node_id: str) None[source]

Disable edges attached to the node and the NodeSpecification enable switches to False

done(epoch: int, node_id: str) MetaEvent | None[source]

Mark a node in a given epoch as done.

enable_node(node_id: str) None[source]

Enable edges attached to the node and the NodeSpecification enable switches to True

end_epoch(epoch: int | None = None) MetaEvent | None[source]
epoch_completed(epoch: int) bool[source]

Check if the epoch has been completed.

expire(epoch: int, node_id: str) MetaEvent | None[source]

Mark a node as having been completed without making its dependent nodes ready. i.e. when the node emitted NoEvent

generations() list[tuple[str, ...]][source]

Get the topological generations of the graph: tuples for each set of nodes that can be run at the same time.

Order within a generation is not guaranteed to be stable.

get_ready(epoch: int | None = None) list[MetaEvent][source]

Output the set of nodes that are ready across different epochs.

Parameters:

epoch (int | None) – if an int, get ready events for that epoch, if None , get ready events for all epochs.

validator get_sources  »  all fields[source]

Get the IDs of the nodes that do not depend on other nodes.

  • input nodes are special implicit source nodes. Other nodes

  • CAN depend on it and still be a source node.

has_cycle() bool[source]

Checks that the graph is acyclic.

is_active(epoch: int | None = None) bool[source]

Graph remains active while it holds at least one epoch that is active.

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

node_is_ready(node: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], epoch: int | None = None) bool[source]

Check if a single node is ready in a single or any epoch

Parameters:
  • node (NodeID) – the node to check

  • epoch (int | None) – the epoch to check, if None , any epoch

sources_finished(epoch: int | None = None) bool[source]

Check the source nodes of the given epoch have been processed. If epoch is None, check the source nodes of the latest epoch.

update(events: MutableSequence[Event | MetaEvent] | MutableSequence[Event]) MutableSequence[Event] | MutableSequence[Event | MetaEvent][source]

When a set of events are received, update the graphs within the scheduler. Currently only has TopoSorter.done() implemented.