base¶
- class TubeRunner(tube: ~noob.tube.Tube, store: ~noob.store.EventStore = <factory>, max_iter_loops: int = 100, _callbacks: list[~collections.abc.Callable[[~noob.event.Event | ~noob.event.MetaEvent], None]] = <factory>, _logger: ~logging.Logger = None, _runner_id: str | None = None)[source]¶
Abstract parent class for tube runners.
Tube runners handle calling the nodes and passing the events returned by them to each other. Each runner may do so however it needs to (synchronously, asynchronously, alone or as part of a cluster, etc.) as long as it satisfies this abstract interface.
- store: EventStore¶
- max_iter_loops: int = 100¶
The max number of times that iter will call process to try and get a result
- process(**kwargs: Any) None | dict[str, Any] | Any[source]¶
Process one step of data from each of the sources, passing intermediate data to any subscribed nodes in a chain.
The process method normally does not return anything, except when using the special
ReturnnodeProcess-scoped
inputs can be passed as kwargs.The Base process method is implemented as a series of lifecycle methods and hooks corresponding to different stages of a process call. Subclasses may override each of these methods to customize runner behavior. Subclasses may also override the
TubeRunner.process()method itself, but must ensure that the phases of the base process method are executed.The methods invoked, in order (see docstrings for each for further explanation)
_validate_input()_before_process()_filter_ready()_get_node()_collect_input()_before_call_node()_call_node()_after_call_node()_handle_events()_after_process()
Process methods are also wrapped by
_asset_context()at two levels:Process scope: as a contextmanager wrapping from
_before_processto_after_processNode scope: around the
_process_nodemethod
Runner-scoped assets are initialized and deinitialized in
TubeRunner.init()andTubeRunner.deinit()
- abstractmethod init() None | Coroutine[source]¶
Start processing data with the tube graph.
Implementations of this method must
Initialize nodes
Initialize runner-scoped assets
raise a
TubeRunningErrorif the tube has already been started and is running, (i.e.deinit()has not been called, or the tube has not exhausted itself)
- abstractmethod deinit() None | Coroutine[source]¶
Stop processing data with the tube graph
Implementations of this method must
Deinitialize nodes
Deinitialize runner-scoped assets
- iter(n: int | None = None) Generator[None | dict[str, Any] | Any, None, None][source]¶
Treat the runner as an iterable.
Calls
TubeRunner.process()until it yields a result (e.g. multiple times in the case of anygathers that change the cardinality of the graph.)
- run(n: int) list[None | dict[str, Any] | Any][source]¶
- run(n: None) None
Run the tube infinitely or for a fixed number of iterations in a row.
Returns results if
nis notNone- IfnisNone, we assume we are going to be running for a very long time, and don’t want to have an infinitely-growing collection in memory.
- abstractmethod collect_return(epoch: int | None = None) None | dict[str, Any] | Any[source]¶
If any
Returnnodes are in the tube, gather their return values to return fromTubeRunner.process()
- abstractmethod disable_node(node_id: str) None[source]¶
A method for disabling a node during runtime
- get_context() RunnerContext[source]¶
- call_async_from_sync(fn: ~collections.abc.Callable[[~_PProcess], ~collections.abc.Coroutine[~typing.Any, ~typing.Any, ~noob.runner.base._TReturn]], executor: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, *args: ~typing.~_PProcess, **kwargs: ~typing.~_PProcess) _TReturn[source]¶
Call an async function synchronously, either in this thread or a subthread.
So here’s the deal with this nonsense:
Calling async from sync is easy when there is no running eventloop in the thread
Calling async from sync is almost comically hard when there is a running eventloop.
We are likely to encounter the second case where, e.g., some async application calls some other code that uses a
SyncRunnerto run a tube that has an async node.asyncio.run()andasyncio.Runnerrefuse to run when there is live eventloop, and attempting to use any of theTaskorFuturespawning methods from the running eventloop likecall_soon()orasyncio.run_coroutine_threadsafe()and then polling for the result withasyncio.Future.result()causes a deadlock between the outer sync thread and the eventloop. The basic problem is that there is no way to wait in the thread (synchronously) that yields the thread to the eventloop (which is what async functions are for).We need to make a new thread in some way, Django’s
asgirefhas a mindblowingly complicated async_to_sync function that works roughly by creating a new thread and then callingasyncio.run()from within that (plus about a thousand other things to manage all the edge cases). That’s more than a little bit cursed, because ideally, since the hard case here is where there is already an eventloop in the outer thread, we would be able to just use that eventloop. Normally one would justawaitthe coro directly, which is whatAsyncRunnerdoes, but theSyncRunnercan’t do that becauseSyncRunner.process()is sync.However if one creates a new thread with a new eventloop, that will break any stateful nodes that e.g. have objects like
asyncio.Eventthat are bound to the first eventloop.Until we can figure out how to reuse the outer eventloop, we do the best we can with a modified version of
asgiref‘s approach.Create a
asyncio.Futureto store the eventual result we will return (the “result future”)Wrap the coroutine to call in another coroutine that calls
set_result()orset_exception()on some passed future rather than returning the result directlyUse a
ThreadPoolExecutorto run the wrapped coroutine in a newasyncio.AbstractEventLoopin a separate thread, returning a secondFuture(the “completion future”)Add a callback to the completion future that notifies a
ConditionWait in the main thread for the
Conditionto be notifiedReturn the result from the result future.
The reason we don’t just directly return the value of the process coroutine in the inner wrapper coroutine and then return the result of the completion future is error handling - Errors raised in the wrapping coroutine have a large amount of noise in the traceback, so instead we use
set_exception()to propagate the raised error up to the main thread and raise it there.- Parameters:
fn – The callable that returns a coroutine to run
executor (
concurrent.futures.ThreadPoolExecutor | None) – Provide an already-created thread pool executor. IfNone, creates one and shuts it down before returning*args – Passed to
fn**kwargs – Passed to
fn
Returns: The result of the called function
References