state

pydantic model State[source]

A collection of assets storing objects that persist through iterations of the tube. The target demographics generally include database connections, large arrays and statistics that traverse multiple processes of the tube.

The State model is a container for a set of assets that are fully instantiated. It does not handle processing the assets – that is handled by a TubeRunner.

Show JSON schema
{
   "title": "State",
   "description": "A collection of assets storing objects that persist through iterations of the tube.\nThe target demographics generally include database connections, large arrays and statistics\nthat traverse multiple processes of the tube.\n\nThe :class:`.State` model is a container for a set of assets that are fully instantiated.\nIt does not handle processing the assets -- that is handled by a TubeRunner.",
   "type": "object",
   "properties": {
      "assets": {
         "additionalProperties": {
            "$ref": "#/$defs/Asset"
         },
         "title": "Assets",
         "type": "object"
      },
      "dependencies": {
         "additionalProperties": {
            "$ref": "#/$defs/_AssetDependency"
         },
         "title": "Dependencies",
         "type": "object"
      },
      "scope_to_assets": {
         "additionalProperties": {
            "items": {
               "$ref": "#/$defs/Asset"
            },
            "type": "array"
         },
         "propertyNames": {
            "$ref": "#/$defs/AssetScope"
         },
         "title": "Scope To Assets",
         "type": "object"
      }
   },
   "$defs": {
      "Asset": {
         "additionalProperties": false,
         "description": "An asset within a processing tube.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "string"
            },
            "spec": {
               "$ref": "#/$defs/AssetSpecification"
            },
            "scope": {
               "$ref": "#/$defs/AssetScope"
            },
            "params": {
               "additionalProperties": true,
               "title": "Params",
               "type": "object"
            },
            "depends": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "title": "Depends"
            },
            "obj": {
               "anyOf": [
                  {},
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Obj"
            },
            "stored_at": {
               "default": -1,
               "title": "Stored At",
               "type": "integer"
            }
         },
         "required": [
            "id",
            "spec",
            "scope",
            "depends"
         ],
         "title": "Asset",
         "type": "object"
      },
      "AssetScope": {
         "enum": [
            "runner",
            "process",
            "node"
         ],
         "title": "AssetScope",
         "type": "string"
      },
      "AssetSpecification": {
         "description": "Specification for a single asset within a tube .yaml file.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "string"
            },
            "type": {
               "title": "Type",
               "type": "string"
            },
            "scope": {
               "$ref": "#/$defs/AssetScope"
            },
            "params": {
               "anyOf": [
                  {
                     "additionalProperties": true,
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Params"
            },
            "depends": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Depends"
            },
            "description": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Description"
            }
         },
         "required": [
            "id",
            "type",
            "scope"
         ],
         "title": "AssetSpecification",
         "type": "object"
      },
      "_AssetDependency": {
         "properties": {
            "asset_id": {
               "title": "Asset Id",
               "type": "string"
            },
            "signal": {
               "title": "Signal",
               "type": "string"
            }
         },
         "required": [
            "asset_id",
            "signal"
         ],
         "title": "_AssetDependency",
         "type": "object"
      }
   }
}

Fields:
field assets: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Asset] [Optional]
field dependencies: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], _AssetDependency] [Optional]

Map from node signals that assets depend on to the asset and signal ids. See AssetSpecification.depends .

Only those dependencies that require copying are included here (assets which are not used after the node that is depended on emits them don’t need to be copied to protect against mutation within the same epoch after they are stored).

field scope_to_assets: dict[AssetScope, list[Asset]] [Optional]

Map from AssetScope to Asset to circumvent querying scope for each asset in State.init() and State.deinit()

classmethod from_specification(specs: dict[str, AssetSpecification], edges: list[Edge] | None = None) Self[source]

Instantiate a State model from its configuration

Parameters:
  • spec (dict[str, AssetSpecification]) – the State config to instantiate

  • edges (list[Edge] | None) – If present, edges for the whole graph, used to reduce copying for assets using dependencies to store values between epochs. If there are no other nodes that depend on the value that the asset depends on, then we don’t have to copy.

clear() None[source]

Clear assets.

collect(edges: list[Edge], epoch: int) dict | None[source]

Gather events into a form that can be consumed by a Node.process() method, given the collection of inbound edges (usually from Tube.in_edges() ).

If none of the requested events have been emitted, return None.

If all of the requested events have been emitted, return a kwarg-like dict

If some of the requested events are missing but others are present, return None for any missing events.

Todo

Add an example

deinit(scope: AssetScope, edges: list[Edge] | None = None) None[source]

run Asset.deinit() for assets that correspond to the given scope. Usually means that Asset.obj attribute is cleared to None.

For AssetScope.node , should provide the nodes edges to determine which assets to deinitialize, if any. If not passed, all node-scoped assets are deinitialized

init(scope: AssetScope, edges: list[Edge] | None = None) None[source]

run Asset.init() for assets that correspond to the given scope. Usually means that Asset.obj attribute gets populated.

For AssetScope.node , should provide the nodes edges to determine which assets to initialize, if any. If not passed, all node-scoped assets are initialized

init_context(scope: AssetScope, edges: list[Edge] | None = None) Iterator[None][source]

Contextmanager for initializing and deinitializing assets by scope

update(events: list[Event]) None[source]

Update asset if asset depends on a node signal