Source code for noob.asset
import inspect
from collections.abc import Callable
from copy import deepcopy
from enum import StrEnum
from typing import Any, Generic, ParamSpec, Self, TypeVar
from pydantic import BaseModel, ConfigDict, Field, model_validator
from noob.types import AbsoluteIdentifier, DependencyIdentifier, PythonIdentifier
from noob.utils import resolve_python_identifier
TOutput = TypeVar("TOutput")
PWrap = ParamSpec("PWrap")
[docs]
class AssetScope(StrEnum):
runner = "runner"
"""
Asset persists through the entire lifespan of the runner.
Can be modified and passed through different epochs.
"""
process = "process"
"""
Asset persists through the a single process call.
Can be modified and passed through different nodes but are
re-instantiated at the beginning of each epoch.
"""
node = "node"
"""
Asset is re-instantiated every node call.
"""
"""Defines at which scale the resource should be locked."""
[docs]
class AssetSpecification(BaseModel):
"""
Specification for a single asset within a tube .yaml file.
"""
id: PythonIdentifier
"""The unique identifier of the asset"""
type_: AbsoluteIdentifier = Field(..., alias="type")
"""The python path to the location of the asset
e.g. package_name.module_name.ClassName
"""
scope: AssetScope
"""The scope of the asset. See :class:`.AssetScope`"""
params: dict | None = None
"""Initialization parameters"""
depends: DependencyIdentifier | None = None
"""
Roundtrip dependency. Should point to the last node in a given
epoch that manipulates the asset.
May only be used with scope == "runner"
Typically this is used with assets that are mutated by multiple nodes in a tube.
In that case, nodes should use dependencies to structure the order of mutation:
The first node that should have it should depend directly on the asset,
and then it and each node should emit the asset
so that the successive node can depend on that signal.
The node signal that this asset specification depends on will be the version of the asset
stored and used in the next processing epoch.
"""
description: str | None = None
"""An optional description of the asset"""
[docs]
@model_validator(mode="after")
def validate_depends(self) -> Self:
"""
depends can only be used with scope == "runner"
"""
if self.depends is not None and self.scope != AssetScope.runner:
raise ValueError(
f"'depends' must be used with scope 'runner'. Provided scope: {self.scope.value}"
)
return self
[docs]
class Asset(BaseModel):
"""An asset within a processing tube."""
id: PythonIdentifier
"""The unique identifier of the asset"""
spec: AssetSpecification
"""The specs of the asset. See :class:`.AssetSpecification`"""
scope: AssetScope
"""The scope of the asset. See :class:`.AssetScope`"""
params: dict[str, Any] = Field(default_factory=dict)
"""Initialization parameters"""
depends: DependencyIdentifier | None
"""The signal that this asset gets updated by. See :attr:`.AssetSpecification.depends`"""
obj: Any | None = None
"""Instantiated asset instance"""
stored_at: int = -1
"""The latest epoch the asset was stored at. Only used when depends is not `None`"""
model_config = ConfigDict(extra="forbid")
[docs]
def init(self) -> None:
"""
Initialize the asset instance.
Default is a no-op.
Subclasses do not need to override if they have no initialization logic.
"""
pass
[docs]
def deinit(self) -> None:
"""
Deinitialize the asset instance.
Default is a no-op.
Subclasses do not need to override if they have no deinit logic.
"""
pass
[docs]
@classmethod
def from_specification(cls, spec: "AssetSpecification") -> "Asset":
"""
Create an asset from its spec
- resolve the asset type
- if a subclass of :class:`.Asset`, just instantiate it
- if not a subclass :class:`.Asset`, wrap it in :class:`.WrapClassAsset`
- if a function, wrap it in :class:`.WrapFuncAsset`
"""
obj = resolve_python_identifier(spec.type_)
params = spec.params if spec.params is not None else {}
scope = spec.scope
depends = spec.depends if spec.depends is not None else None
# check if function by checking if callable -
# Node classes do not have __call__ defined and thus should not be callable
if inspect.isclass(obj):
if issubclass(obj, Asset):
return obj(id=spec.id, spec=spec, scope=scope, depends=depends, **params)
else:
return WrapClassAsset(
id=spec.id, cls=obj, spec=spec, params=params, scope=scope, depends=depends
)
else:
return WrapFuncAsset(
id=spec.id, fn=obj, spec=spec, params=params, scope=scope, depends=depends
)
[docs]
def update(self, value: Any, epoch: int) -> None:
self.obj = deepcopy(value)
self.stored_at = epoch
T = TypeVar("T")
[docs]
class WrapClassAsset(Asset, Generic[T]):
"""
Wrap a non-Asset class.
Wrapping allows us to use arbitrary classes as Assets within noob. Initializes
the inner class to hold the class instance as an asset object.
After instantiating the outer wrapping class, instantiate the inner wrapped class
using the `params` given to the outer wrapping class during :meth:`.init` .
"""
cls: type
obj: T | None = None
[docs]
def init(self) -> None:
self.obj = self.cls(**self.params)
[docs]
def deinit(self) -> None:
self.obj = None
[docs]
class WrapFuncAsset(Asset):
"""
Wrap a function to build an Asset.
The function effectively takes the role of :meth:`.__init__`, with the outer wrapping class
`params` being injected as function parameters. The output of the function becomes the
asset object.
"""
fn: Callable
[docs]
def init(self) -> None:
self.obj = self.fn(**self.params)
[docs]
def deinit(self) -> None:
self.obj = None