tube¶
- pydantic model TubeSpecification[source]¶
Configuration for the nodes within a tube.
Representation of the yaml-form of a tube. Converted to the runtime-form with
Tube.from_specification().Not much, if any validation is performed here on the whole tube except that the nodes have the correct fields, ignoring validity of e.g. type mismatches or dependencies on signals that don’t exist. Those require importing and introspecting the specified node classes, which should only happen when we try and instantiate the tube - this class is just a carrier for the yaml spec.
Show JSON schema
{ "title": "TubeSpecification", "description": "Configuration for the nodes within a tube.\n\nRepresentation of the yaml-form of a tube.\nConverted to the runtime-form with :meth:`.Tube.from_specification`.\n\nNot much, if any validation is performed here on the whole tube except\nthat the nodes have the correct fields, ignoring validity of\ne.g. type mismatches or dependencies on signals that don't exist.\nThose require importing and introspecting the specified node classes,\nwhich should only happen when we try and instantiate the tube -\nthis class is just a carrier for the yaml spec.", "type": "object", "properties": { "noob_id": { "anyOf": [ { "pattern": "[\\w\\-\\/#]+", "type": "string" }, { "type": "null" } ], "default": null, "title": "Noob Id" }, "noob_model": { "default": null, "title": "Noob Model", "type": "string" }, "noob_version": { "default": "0.1.2.dev232+g3ee3578", "title": "Noob Version", "type": "string" }, "assets": { "additionalProperties": { "$ref": "#/$defs/AssetSpecification" }, "title": "Assets", "type": "object" }, "input": { "additionalProperties": { "$ref": "#/$defs/InputSpecification" }, "title": "Input", "type": "object" }, "nodes": { "additionalProperties": { "$ref": "#/$defs/NodeSpecification" }, "title": "Nodes", "type": "object" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "$defs": { "AssetScope": { "enum": [ "runner", "process", "node" ], "title": "AssetScope", "type": "string" }, "AssetSpecification": { "description": "Specification for a single asset within a tube .yaml file.", "properties": { "id": { "title": "Id", "type": "string" }, "type": { "title": "Type", "type": "string" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Params" }, "depends": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Depends" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "id", "type", "scope" ], "title": "AssetSpecification", "type": "object" }, "InputScope": { "description": "The scope that input must be provided in", "enum": [ "tube", "process" ], "title": "InputScope", "type": "string" }, "InputSpecification": { "description": "Specification of inputs to a noob tube.\n\nInputs can be supplied at different times and frequencies,\nas specified by `scope`:\n\n- `tube`: When instantiating the tube\n- `process`: Per call to :meth:`.TubeRunner.process`\n\n`tube`-scoped inputs may be used in a node's `param` specification,\nand `process`-scoped inputs may be used as one of a node's `depends`.\n\nInputs can be supplied at a \"higher\" scope and be accessed by lower scopes:\ne.g. input requested with a process scope can use input provided when instantiating the tube,\nif not provided to process but provided to the tube.", "properties": { "id": { "title": "Id", "type": "string" }, "type": { "title": "Type", "type": "string" }, "scope": { "$ref": "#/$defs/InputScope", "default": "tube" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "id", "type" ], "title": "InputSpecification", "type": "object" }, "NodeSpecification": { "description": "Specification for a single processing node within a tube .yaml file.", "properties": { "type": { "title": "Type", "type": "string" }, "id": { "title": "Id", "type": "string" }, "depends": { "anyOf": [ { "items": { "anyOf": [ { "type": "string" }, { "additionalProperties": { "type": "string" }, "maxProperties": 1, "minProperties": 1, "type": "object" } ] }, "type": "array" }, { "type": "string" }, { "type": "null" } ], "default": null, "title": "Depends" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Params" }, "enabled": { "default": true, "title": "Enabled", "type": "boolean" }, "stateful": { "anyOf": [ { "type": "boolean" }, { "type": "null" } ], "default": null, "title": "Stateful" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "type", "id" ], "title": "NodeSpecification", "type": "object" } } }
- Config:
validate_default: bool = True
- Fields:
- Validators:
dependencies_exist»all fields
- field assets: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], AssetSpecification] [Optional]¶
The specs of the assets that comprise the state of this tube
- Validated by:
- field input: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], InputSpecification] [Optional]¶
Inputs provided at runtime
- Validated by:
- field nodes: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], NodeSpecification] [Optional]¶
The nodes that this tube configures
- Validated by:
- validator fill_node_ids » nodes, assets, input[source]¶
Roll down the id from the key in the nodes dictionary into the node config
- validator dependencies_exist » all fields[source]¶
Nodes referred to in dependencies should exist, they must be - other nodes in the spec - input (and the input signal must exist) - assets (and the asset must exist)
Note: we do not check the validity of signals here, as that would require us to import/inspect the code object referred to by the node, and we want specifications to be loadable even if the referred code is not present and installed in the environment.
- pydantic model Tube[source]¶
A graph of nodes transforming some input source(s) to some output sink(s)
The Tube model is a container for a set of nodes that are fully instantiated (e.g. have their “passed” and “fill” keys processed) and connected. It does not handle running the tube – that is handled by a TubeRunner.
Show JSON schema
{ "title": "Tube", "type": "object", "properties": { "tube_id": { "title": "Tube Id", "type": "string" }, "nodes": { "additionalProperties": { "$ref": "#/$defs/Node" }, "title": "Nodes", "type": "object" }, "edges": { "items": { "$ref": "#/$defs/Edge" }, "title": "Edges", "type": "array" }, "input": { "additionalProperties": true, "title": "Input", "type": "object" }, "input_collection": { "$ref": "#/$defs/InputCollection" }, "state": { "$ref": "#/$defs/State" }, "scheduler": { "default": null, "title": "Scheduler" } }, "$defs": { "Asset": { "additionalProperties": false, "description": "An asset within a processing tube.", "properties": { "id": { "title": "Id", "type": "string" }, "spec": { "$ref": "#/$defs/AssetSpecification" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "additionalProperties": true, "title": "Params", "type": "object" }, "depends": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "title": "Depends" }, "obj": { "anyOf": [ {}, { "type": "null" } ], "default": null, "title": "Obj" }, "stored_at": { "default": -1, "title": "Stored At", "type": "integer" } }, "required": [ "id", "spec", "scope", "depends" ], "title": "Asset", "type": "object" }, "AssetScope": { "enum": [ "runner", "process", "node" ], "title": "AssetScope", "type": "string" }, "AssetSpecification": { "description": "Specification for a single asset within a tube .yaml file.", "properties": { "id": { "title": "Id", "type": "string" }, "type": { "title": "Type", "type": "string" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Params" }, "depends": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Depends" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "id", "type", "scope" ], "title": "AssetSpecification", "type": "object" }, "Edge": { "description": "Directed connection between an output slot a node and an input slot in another node", "properties": { "source_node": { "title": "Source Node", "type": "string" }, "source_signal": { "title": "Source Signal", "type": "string" }, "target_node": { "title": "Target Node", "type": "string" }, "target_slot": { "anyOf": [ { "type": "string" }, { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Target Slot" }, "required": { "default": true, "title": "Required", "type": "boolean" } }, "required": [ "source_node", "source_signal", "target_node" ], "title": "Edge", "type": "object" }, "InputCollection": { "description": "A collection of input specifications used during runtime, split by scope,\nto validate presence of and to combine inputs.", "properties": { "specs": { "additionalProperties": { "additionalProperties": { "$ref": "#/$defs/InputSpecification" }, "type": "object" }, "propertyNames": { "$ref": "#/$defs/InputScope" }, "title": "Specs", "type": "object" } }, "title": "InputCollection", "type": "object" }, "InputScope": { "description": "The scope that input must be provided in", "enum": [ "tube", "process" ], "title": "InputScope", "type": "string" }, "InputSpecification": { "description": "Specification of inputs to a noob tube.\n\nInputs can be supplied at different times and frequencies,\nas specified by `scope`:\n\n- `tube`: When instantiating the tube\n- `process`: Per call to :meth:`.TubeRunner.process`\n\n`tube`-scoped inputs may be used in a node's `param` specification,\nand `process`-scoped inputs may be used as one of a node's `depends`.\n\nInputs can be supplied at a \"higher\" scope and be accessed by lower scopes:\ne.g. input requested with a process scope can use input provided when instantiating the tube,\nif not provided to process but provided to the tube.", "properties": { "id": { "title": "Id", "type": "string" }, "type": { "title": "Type", "type": "string" }, "scope": { "$ref": "#/$defs/InputScope", "default": "tube" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "id", "type" ], "title": "InputSpecification", "type": "object" }, "Node": { "additionalProperties": false, "description": "A node within a processing tube", "properties": { "id": { "title": "Id", "type": "string" }, "spec": { "anyOf": [ { "$ref": "#/$defs/NodeSpecification" }, { "type": "null" } ], "default": null }, "enabled": { "default": true, "title": "Enabled", "type": "boolean" }, "stateful": { "default": true, "title": "Stateful", "type": "boolean" } }, "required": [ "id" ], "title": "Node", "type": "object" }, "NodeSpecification": { "description": "Specification for a single processing node within a tube .yaml file.", "properties": { "type": { "title": "Type", "type": "string" }, "id": { "title": "Id", "type": "string" }, "depends": { "anyOf": [ { "items": { "anyOf": [ { "type": "string" }, { "additionalProperties": { "type": "string" }, "maxProperties": 1, "minProperties": 1, "type": "object" } ] }, "type": "array" }, { "type": "string" }, { "type": "null" } ], "default": null, "title": "Depends" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Params" }, "enabled": { "default": true, "title": "Enabled", "type": "boolean" }, "stateful": { "anyOf": [ { "type": "boolean" }, { "type": "null" } ], "default": null, "title": "Stateful" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "type", "id" ], "title": "NodeSpecification", "type": "object" }, "State": { "description": "A collection of assets storing objects that persist through iterations of the tube.\nThe target demographics generally include database connections, large arrays and statistics\nthat traverse multiple processes of the tube.\n\nThe :class:`.State` model is a container for a set of assets that are fully instantiated.\nIt does not handle processing the assets -- that is handled by a TubeRunner.", "properties": { "assets": { "additionalProperties": { "$ref": "#/$defs/Asset" }, "title": "Assets", "type": "object" }, "dependencies": { "additionalProperties": { "$ref": "#/$defs/_AssetDependency" }, "title": "Dependencies", "type": "object" }, "scope_to_assets": { "additionalProperties": { "items": { "$ref": "#/$defs/Asset" }, "type": "array" }, "propertyNames": { "$ref": "#/$defs/AssetScope" }, "title": "Scope To Assets", "type": "object" } }, "title": "State", "type": "object" }, "_AssetDependency": { "properties": { "asset_id": { "title": "Asset Id", "type": "string" }, "signal": { "title": "Signal", "type": "string" } }, "required": [ "asset_id", "signal" ], "title": "_AssetDependency", "type": "object" } }, "required": [ "tube_id", "nodes", "edges" ] }
- Fields:
- Validators:
_create_scheduler»schedulerassets_exhausted_after_storage»all fieldsinput_present»all fields
- field edges: list[Edge] [Required]¶
Edges connecting slots within nodes.
The nodes within
Edge.source_nodeandEdge.target_nodemust be the same objects as those inTube.nodes(i.e.edges[0].source_node is nodes[node_id]).- Validated by:
- field input_collection: InputCollection [Optional]¶
Specifications declared by the tube to be supplied
- Validated by:
- field nodes: dict[str, Node] [Required]¶
Dictionary mapping all nodes from their ID to the instantiated node.
- Validated by:
- classmethod from_specification(spec: TubeSpecification | Path | PathLike[str] | Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='[\\w\\-\\/#]+')])], input: dict | None = None) Self[source]¶
Instantiate a tube model from its configuration
- Parameters:
spec (
TubeSpecification) – the tube config to instantiate- Raises:
InputMissingError if requested tube-scoped input is not present. –
- validator only_one_return » nodes[source]¶
Only one return node is allowed.
Validate both here and in the spec because tubes can be created programmatically
- validator assets_exhausted_after_storage » all fields[source]¶
Runner-scoped assets that depend on node outputs can’t be directly depended on in topological generations during or after when they are stored in order to protect against unstructured mutation.
- in_edges(node: Node | str) list[Edge][source]¶
Edges going towards the given node (i.e. the node is the edge’s
target)
- validator input_present » all fields[source]¶
Validate the presence of required tube-scoped inputs.
Though tubes are usually instantiated from specs, this catches the case when they are constructed directly.
- model_post_init(context: Any, /) None¶
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self – The BaseModel instance.
context – The context.