base

pydantic model Slot[source]

Show JSON schema
{
   "title": "Slot",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "annotation": {
         "title": "Annotation"
      },
      "required": {
         "default": true,
         "title": "Required",
         "type": "boolean"
      }
   },
   "required": [
      "name",
      "annotation"
   ]
}

Fields:
field annotation: Any [Required]
field name: str [Required]
field required: bool = True
classmethod from_callable(func: Callable) dict[str, Slot][source]
pydantic model Signal[source]

Show JSON schema
{
   "title": "Signal",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "type_": {
         "title": "Type",
         "type": "null"
      }
   },
   "required": [
      "name",
      "type_"
   ]
}

Config:
  • arbitrary_types_allowed: bool = True

Fields:
field name: str [Required]
field type_: type | None | UnionType | GenericAlias [Required]
classmethod from_callable(func: Callable) list[Signal][source]
pydantic model Edge[source]

Directed connection between an output slot a node and an input slot in another node

Show JSON schema
{
   "title": "Edge",
   "description": "Directed connection between an output slot a node and an input slot in another node",
   "type": "object",
   "properties": {
      "source_node": {
         "title": "Source Node",
         "type": "string"
      },
      "source_signal": {
         "title": "Source Signal",
         "type": "string"
      },
      "target_node": {
         "title": "Target Node",
         "type": "string"
      },
      "target_slot": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Target Slot"
      },
      "required": {
         "default": true,
         "title": "Required",
         "type": "boolean"
      }
   },
   "required": [
      "source_node",
      "source_signal",
      "target_node"
   ]
}

Fields:
field required: bool = True
field source_node: str [Required]
field source_signal: str [Required]
field target_node: str [Required]
field target_slot: str | int | None = None
  • For kwargs, target_slot is the name of the kwarg that the value is passed to.

  • For positional arguments, target_slot is an integer that indicates the index of the arg

  • For scalar arguments, target slot is None

process_method(func: _TProcess) _TProcess[source]

Decorator to mark a method as the designated ‘process’ method for a class.

pydantic model Node[source]

A node within a processing tube

Show JSON schema
{
   "title": "Node",
   "description": "A node within a processing tube",
   "type": "object",
   "properties": {
      "id": {
         "title": "Id",
         "type": "string"
      },
      "spec": {
         "anyOf": [
            {
               "$ref": "#/$defs/NodeSpecification"
            },
            {
               "type": "null"
            }
         ],
         "default": null
      },
      "enabled": {
         "default": true,
         "title": "Enabled",
         "type": "boolean"
      },
      "stateful": {
         "default": true,
         "title": "Stateful",
         "type": "boolean"
      }
   },
   "$defs": {
      "NodeSpecification": {
         "description": "Specification for a single processing node within a tube .yaml file.",
         "properties": {
            "type": {
               "title": "Type",
               "type": "string"
            },
            "id": {
               "title": "Id",
               "type": "string"
            },
            "depends": {
               "anyOf": [
                  {
                     "items": {
                        "anyOf": [
                           {
                              "type": "string"
                           },
                           {
                              "additionalProperties": {
                                 "type": "string"
                              },
                              "maxProperties": 1,
                              "minProperties": 1,
                              "type": "object"
                           }
                        ]
                     },
                     "type": "array"
                  },
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Depends"
            },
            "params": {
               "anyOf": [
                  {
                     "additionalProperties": true,
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Params"
            },
            "enabled": {
               "default": true,
               "title": "Enabled",
               "type": "boolean"
            },
            "stateful": {
               "anyOf": [
                  {
                     "type": "boolean"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Stateful"
            },
            "description": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Description"
            }
         },
         "required": [
            "type",
            "id"
         ],
         "title": "NodeSpecification",
         "type": "object"
      }
   },
   "additionalProperties": false,
   "required": [
      "id"
   ]
}

Config:
  • extra: str = forbid

Fields:
field enabled: bool = True

Starting state for a node being enabled. When a node is disabled, it will be deinitialized and removed from the processing graph, but the node object will still be kept by the Tube. Nodes can be disabled and enabled during a tube’s operation without recreating the tube. When a node is disabled, other nodes that depend on it will not be disabled, but they may never be called since their dependencies will never be satisfied.

field id: str [Required]

Unique identifier of the node

field spec: NodeSpecification | None = None
field stateful: bool = True

Whether this node is stateful (True), or stateless (False). Stateful nodes are assumed to care about the order in which they receive events - i.e. for a given set of inputs, the values returned by process are different when called in a different order.

This attribute has no effect in synchronous runners, but in concurrent runners where multiple epochs of events can be processed simultaneously, setting a node as stateless can improve performance as the node processes events as soon as it receives them rather than waiting for the next epoch in the sequence to arrive.

Defined as an instance, rather than a class attribute to allow it being overridden by a node specification. Subclasses should override the default value to be considered stateless by default.

By default, unless specified otherwise:

  • Class nodes are considered stateful

  • Generator nodes are considered stateful

  • Function nodes are considered stateless

classmethod from_specification(spec: NodeSpecification, input_collection: InputCollection | None = None) Node[source]

Create a node from its spec

  • resolve the node type

  • if a function, wrap it in a node class

  • if a class, just instantiate it

deinit() None[source]

Stop producing, processing, or receiving data

Default is a no-op. Subclasses do not need to override if they have no deinit logic.

init() None[source]

Start producing, processing, or receiving data.

Default is a no-op. Subclasses do not need to override if they have no initialization logic.

Subclasses MAY add a context: RunnerContext param to request information about the enclosing runner while initializing

model_post_init(_Node__context: Any) None[source]

See docstring of process() for description of post init wrapping of generators

process(*args: Any, **kwargs: Any) Any | None[source]

Process some input, emitting it. See subclasses for details.

If the process method is a generator, when the Node class is instantiated, this method is replaced by one that wraps creating and calling the generator.

something like this:

`python gen = self.process() self.process = lambda: next(gen) `

Note that send handling is not implemented for generators, so process methods that are generators cannot depend on events from any other nodes (i.e. behave like source nodes).

property edges: list[Edge]

The dependencies this node has declared, express as edges between another node’s signals and our slots

property signals: list[Signal]
property slots: dict[str, Slot]
pydantic model WrapClassNode[source]

Wrap a non-Node class that has annotated one of its methods as being the “process” method using the process_method() decorator.

Wrapping allows us to use arbitrary classes as Nodes within noob, which expects a process method, but avoids the problem of potentially breaking the class if it has its own attribute or method named process.

After instantiating the outer wrapping class, instantiate the inner wrapped class using the params given to the outer wrapping class during model_post_init() . Then dynamically assign the discovered process method on the inner class to the outer class as process.

Dynamic discovery at instantiation time, rather than statically defining an outer process method that then calls the inner method annotated with process_method does two things:

  • Allows us to statically infer whether the method is a regular function that return`s or a generator using :func:`inspect.isgeneratorfunction , which relies on a flag set on a method at the time it is defined: e.g. a method that internally switches between return self._wrapped() and yield from self._wrapped() would not be correctly detected.

  • Avoids modifying the signature of the wrapped process method with generic args and kwargs

Show JSON schema
{
   "title": "WrapClassNode",
   "type": "object",
   "properties": {
      "id": {
         "title": "Id",
         "type": "string"
      },
      "spec": {
         "anyOf": [
            {
               "$ref": "#/$defs/NodeSpecification"
            },
            {
               "type": "null"
            }
         ],
         "default": null
      },
      "enabled": {
         "default": true,
         "title": "Enabled",
         "type": "boolean"
      },
      "stateful": {
         "default": true,
         "title": "Stateful",
         "type": "boolean"
      },
      "cls": {
         "default": null,
         "title": "Cls"
      },
      "params": {
         "additionalProperties": true,
         "title": "Params",
         "type": "object"
      },
      "instance": {
         "default": null,
         "title": "Instance"
      }
   },
   "$defs": {
      "NodeSpecification": {
         "description": "Specification for a single processing node within a tube .yaml file.",
         "properties": {
            "type": {
               "title": "Type",
               "type": "string"
            },
            "id": {
               "title": "Id",
               "type": "string"
            },
            "depends": {
               "anyOf": [
                  {
                     "items": {
                        "anyOf": [
                           {
                              "type": "string"
                           },
                           {
                              "additionalProperties": {
                                 "type": "string"
                              },
                              "maxProperties": 1,
                              "minProperties": 1,
                              "type": "object"
                           }
                        ]
                     },
                     "type": "array"
                  },
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Depends"
            },
            "params": {
               "anyOf": [
                  {
                     "additionalProperties": true,
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Params"
            },
            "enabled": {
               "default": true,
               "title": "Enabled",
               "type": "boolean"
            },
            "stateful": {
               "anyOf": [
                  {
                     "type": "boolean"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Stateful"
            },
            "description": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Description"
            }
         },
         "required": [
            "type",
            "id"
         ],
         "title": "NodeSpecification",
         "type": "object"
      }
   },
   "additionalProperties": false,
   "required": [
      "id"
   ]
}

Config:
  • extra: str = forbid

Fields:
field cls: type [Required]
field instance: type | None = None
field params: dict[str, Any] [Optional]
deinit() None[source]
model_post_init(context: Any, /) None[source]

Get the method decorated with process_method(), assign it to process, see class docstring.

pydantic model WrapFuncNode[source]

Show JSON schema
{
   "title": "WrapFuncNode",
   "type": "object",
   "properties": {
      "id": {
         "title": "Id",
         "type": "string"
      },
      "spec": {
         "anyOf": [
            {
               "$ref": "#/$defs/NodeSpecification"
            },
            {
               "type": "null"
            }
         ],
         "default": null
      },
      "enabled": {
         "default": true,
         "title": "Enabled",
         "type": "boolean"
      },
      "stateful": {
         "default": false,
         "title": "Stateful",
         "type": "boolean"
      },
      "fn": {
         "default": null,
         "title": "Fn"
      },
      "params": {
         "additionalProperties": true,
         "title": "Params",
         "type": "object"
      }
   },
   "$defs": {
      "NodeSpecification": {
         "description": "Specification for a single processing node within a tube .yaml file.",
         "properties": {
            "type": {
               "title": "Type",
               "type": "string"
            },
            "id": {
               "title": "Id",
               "type": "string"
            },
            "depends": {
               "anyOf": [
                  {
                     "items": {
                        "anyOf": [
                           {
                              "type": "string"
                           },
                           {
                              "additionalProperties": {
                                 "type": "string"
                              },
                              "maxProperties": 1,
                              "minProperties": 1,
                              "type": "object"
                           }
                        ]
                     },
                     "type": "array"
                  },
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Depends"
            },
            "params": {
               "anyOf": [
                  {
                     "additionalProperties": true,
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Params"
            },
            "enabled": {
               "default": true,
               "title": "Enabled",
               "type": "boolean"
            },
            "stateful": {
               "anyOf": [
                  {
                     "type": "boolean"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Stateful"
            },
            "description": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Description"
            }
         },
         "required": [
            "type",
            "id"
         ],
         "title": "NodeSpecification",
         "type": "object"
      }
   },
   "additionalProperties": false,
   "required": [
      "id"
   ]
}

Config:
  • extra: str = forbid

Fields:
Validators:
field fn: Callable [Required]
Validated by:
field params: dict [Optional]
Validated by:
field stateful: bool = False

Function nodes are considered stateless by default, except if they are generators, which are typically stateful.

Validated by:
validator set_default_statefulness  »  all fields[source]

If no stateful argument is provided explicitly, set stateful default False for functions and True for generators

model_post_init(_WrapFuncNode__context: Any) None[source]

Complete wrapping fn without calling super() because we need to pass params to the function if it is a generator, and create a functools.partial() of it if it is not.