state¶
- pydantic model State[source]¶
A collection of assets storing objects that persist through iterations of the tube. The target demographics generally include database connections, large arrays and statistics that traverse multiple processes of the tube.
The
Statemodel is a container for a set of assets that are fully instantiated. It does not handle processing the assets – that is handled by a TubeRunner.Show JSON schema
{ "title": "State", "description": "A collection of assets storing objects that persist through iterations of the tube.\nThe target demographics generally include database connections, large arrays and statistics\nthat traverse multiple processes of the tube.\n\nThe :class:`.State` model is a container for a set of assets that are fully instantiated.\nIt does not handle processing the assets -- that is handled by a TubeRunner.", "type": "object", "properties": { "assets": { "additionalProperties": { "$ref": "#/$defs/Asset" }, "title": "Assets", "type": "object" }, "dependencies": { "additionalProperties": { "$ref": "#/$defs/_AssetDependency" }, "title": "Dependencies", "type": "object" }, "scope_to_assets": { "additionalProperties": { "items": { "$ref": "#/$defs/Asset" }, "type": "array" }, "propertyNames": { "$ref": "#/$defs/AssetScope" }, "title": "Scope To Assets", "type": "object" } }, "$defs": { "Asset": { "additionalProperties": false, "description": "An asset within a processing tube.", "properties": { "id": { "title": "Id", "type": "string" }, "spec": { "$ref": "#/$defs/AssetSpecification" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "additionalProperties": true, "title": "Params", "type": "object" }, "depends": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "title": "Depends" }, "obj": { "anyOf": [ {}, { "type": "null" } ], "default": null, "title": "Obj" }, "stored_at": { "default": -1, "title": "Stored At", "type": "integer" } }, "required": [ "id", "spec", "scope", "depends" ], "title": "Asset", "type": "object" }, "AssetScope": { "enum": [ "runner", "process", "node" ], "title": "AssetScope", "type": "string" }, "AssetSpecification": { "description": "Specification for a single asset within a tube .yaml file.", "properties": { "id": { "title": "Id", "type": "string" }, "type": { "title": "Type", "type": "string" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Params" }, "depends": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Depends" }, "description": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Description" } }, "required": [ "id", "type", "scope" ], "title": "AssetSpecification", "type": "object" }, "_AssetDependency": { "properties": { "asset_id": { "title": "Asset Id", "type": "string" }, "signal": { "title": "Signal", "type": "string" } }, "required": [ "asset_id", "signal" ], "title": "_AssetDependency", "type": "object" } } }
- Fields:
- field assets: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Asset] [Optional]¶
- field dependencies: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], _AssetDependency] [Optional]¶
Map from node signals that assets depend on to the asset and signal ids. See
AssetSpecification.depends.Only those dependencies that require copying are included here (assets which are not used after the node that is depended on emits them don’t need to be copied to protect against mutation within the same epoch after they are stored).
- field scope_to_assets: dict[AssetScope, list[Asset]] [Optional]¶
Map from
AssetScopetoAssetto circumvent querying scope for each asset inState.init()andState.deinit()
- classmethod from_specification(specs: dict[str, AssetSpecification], edges: list[Edge] | None = None) Self[source]¶
Instantiate a
Statemodel from its configuration- Parameters:
spec (
dict[str,AssetSpecification]) – theStateconfig to instantiateedges (
list[Edge] | None) – If present, edges for the whole graph, used to reduce copying for assets using dependencies to store values between epochs. If there are no other nodes that depend on the value that the asset depends on, then we don’t have to copy.
- collect(edges: list[Edge], epoch: int) dict | None[source]¶
Gather events into a form that can be consumed by a
Node.process()method, given the collection of inbound edges (usually fromTube.in_edges()).If none of the requested events have been emitted, return
None.If all of the requested events have been emitted, return a kwarg-like dict
If some of the requested events are missing but others are present, return
Nonefor any missing events.Todo
Add an example
- deinit(scope: AssetScope, edges: list[Edge] | None = None) None[source]¶
run
Asset.deinit()for assets that correspond to the given scope. Usually means thatAsset.objattribute is cleared to None.For
AssetScope.node, should provide the nodes edges to determine which assets to deinitialize, if any. If not passed, all node-scoped assets are deinitialized
- init(scope: AssetScope, edges: list[Edge] | None = None) None[source]¶
run
Asset.init()for assets that correspond to the given scope. Usually means thatAsset.objattribute gets populated.For
AssetScope.node, should provide the nodes edges to determine which assets to initialize, if any. If not passed, all node-scoped assets are initialized