base

class TubeRunner(tube: ~noob.tube.Tube, store: ~noob.store.EventStore = <factory>, max_iter_loops: int = 100, _callbacks: list[~collections.abc.Callable[[~noob.event.Event | ~noob.event.MetaEvent], None]] = <factory>, _logger: ~logging.Logger = None, _runner_id: str | None = None)[source]

Abstract parent class for tube runners.

Tube runners handle calling the nodes and passing the events returned by them to each other. Each runner may do so however it needs to (synchronously, asynchronously, alone or as part of a cluster, etc.) as long as it satisfies this abstract interface.

tube: Tube
store: EventStore
max_iter_loops: int = 100

The max number of times that iter will call process to try and get a result

property runner_id: str
process(**kwargs: Any) None | dict[str, Any] | Any[source]

Process one step of data from each of the sources, passing intermediate data to any subscribed nodes in a chain.

The process method normally does not return anything, except when using the special Return node

Process-scoped input s can be passed as kwargs.

The Base process method is implemented as a series of lifecycle methods and hooks corresponding to different stages of a process call. Subclasses may override each of these methods to customize runner behavior. Subclasses may also override the TubeRunner.process() method itself, but must ensure that the phases of the base process method are executed.

The methods invoked, in order (see docstrings for each for further explanation)

  • _validate_input()

  • _before_process()

  • _filter_ready()

  • _get_node()

  • _collect_input()

  • _before_call_node()

  • _call_node()

  • _after_call_node()

  • _handle_events()

  • _after_process()

  • collect_return()

Process methods are also wrapped by _asset_context() at two levels:

  • Process scope: as a contextmanager wrapping from _before_process to _after_process

  • Node scope: around the _process_node method

Runner-scoped assets are initialized and deinitialized in TubeRunner.init() and TubeRunner.deinit()

abstractmethod init() None | Coroutine[source]

Start processing data with the tube graph.

Implementations of this method must

  • Initialize nodes

  • Initialize runner-scoped assets

  • raise a TubeRunningError if the tube has already been started and is running, (i.e. deinit() has not been called, or the tube has not exhausted itself)

abstractmethod deinit() None | Coroutine[source]

Stop processing data with the tube graph

Implementations of this method must

  • Deinitialize nodes

  • Deinitialize runner-scoped assets

iter(n: int | None = None) Generator[None | dict[str, Any] | Any, None, None][source]

Treat the runner as an iterable.

Calls TubeRunner.process() until it yields a result (e.g. multiple times in the case of any gather s that change the cardinality of the graph.)

run(n: int) list[None | dict[str, Any] | Any][source]
run(n: None) None

Run the tube infinitely or for a fixed number of iterations in a row.

Returns results if n is not None - If n is None , we assume we are going to be running for a very long time, and don’t want to have an infinitely-growing collection in memory.

abstract property running: bool

Whether the tube is currently running

abstractmethod collect_return(epoch: int | None = None) None | dict[str, Any] | Any[source]

If any Return nodes are in the tube, gather their return values to return from TubeRunner.process()

Returns:

of the Return sink’s key mapped to the returned value, None: if there are no Return sinks in the tube

Return type:

dict

add_callback(callback: Callable[[Event | MetaEvent], None]) None[source]
abstractmethod enable_node(node_id: str) None[source]

A method for enabling a node during runtime

abstractmethod disable_node(node_id: str) None[source]

A method for disabling a node during runtime

get_context() RunnerContext[source]
inject_context(fn: Callable) Callable[source]

Wrap function in a partial with the runner context injected, if requested

call_async_from_sync(fn: ~collections.abc.Callable[[~_PProcess], ~collections.abc.Coroutine[~typing.Any, ~typing.Any, ~noob.runner.base._TReturn]], executor: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, *args: ~typing.~_PProcess, **kwargs: ~typing.~_PProcess) _TReturn[source]

Call an async function synchronously, either in this thread or a subthread.

So here’s the deal with this nonsense:

  • Calling async from sync is easy when there is no running eventloop in the thread

  • Calling async from sync is almost comically hard when there is a running eventloop.

We are likely to encounter the second case where, e.g., some async application calls some other code that uses a SyncRunner to run a tube that has an async node.

asyncio.run() and asyncio.Runner refuse to run when there is live eventloop, and attempting to use any of the Task or Future spawning methods from the running eventloop like call_soon() or asyncio.run_coroutine_threadsafe() and then polling for the result with asyncio.Future.result() causes a deadlock between the outer sync thread and the eventloop. The basic problem is that there is no way to wait in the thread (synchronously) that yields the thread to the eventloop (which is what async functions are for).

We need to make a new thread in some way, Django’s asgiref has a mindblowingly complicated async_to_sync function that works roughly by creating a new thread and then calling asyncio.run() from within that (plus about a thousand other things to manage all the edge cases). That’s more than a little bit cursed, because ideally, since the hard case here is where there is already an eventloop in the outer thread, we would be able to just use that eventloop. Normally one would just await the coro directly, which is what AsyncRunner does, but the SyncRunner can’t do that because SyncRunner.process() is sync.

However if one creates a new thread with a new eventloop, that will break any stateful nodes that e.g. have objects like asyncio.Event that are bound to the first eventloop.

Until we can figure out how to reuse the outer eventloop, we do the best we can with a modified version of asgiref ‘s approach.

  • Create a asyncio.Future to store the eventual result we will return (the “result future”)

  • Wrap the coroutine to call in another coroutine that calls set_result() or set_exception() on some passed future rather than returning the result directly

  • Use a ThreadPoolExecutor to run the wrapped coroutine in a new asyncio.AbstractEventLoop in a separate thread, returning a second Future (the “completion future”)

  • Add a callback to the completion future that notifies a Condition

  • Wait in the main thread for the Condition to be notified

  • Return the result from the result future.

The reason we don’t just directly return the value of the process coroutine in the inner wrapper coroutine and then return the result of the completion future is error handling - Errors raised in the wrapping coroutine have a large amount of noise in the traceback, so instead we use set_exception() to propagate the raised error up to the main thread and raise it there.

Parameters:
  • fn – The callable that returns a coroutine to run

  • executor (concurrent.futures.ThreadPoolExecutor | None) – Provide an already-created thread pool executor. If None , creates one and shuts it down before returning

  • *args – Passed to fn

  • **kwargs – Passed to fn

Returns: The result of the called function

References