Tubes

Noob tubes are collections of nodes that their connections that form a processing graph.

Tubes have two representations:

  • A TubeSpecification - a yaml[1]-serializable form that specifies the abstract structure of the tube

  • A Tube - the in-memory, instantiated form of a tube that contains instantiates Node objects and the rest of the machinery used to run a tube.

We encourage people to write tubes as specifications and load them rather than defining tubes programmatically, though it is possible to do so: one of the goals of noob is to support portable, publishable, lockable[2] processing graphs that don’t require a python interpreter to be able to inspect.

Tube Specifications

Tube specifications consist of a few sections:

  • A header with metadata that identifies the tube

  • A nodes dictionary that defines the nodes and their dependencies

  • An optional input dictionary that defines runtime input to the tube

  • An optional assets dictionary that defines data objects that can persist across multiple runs of the same tube.

Nodes

The nodes dictionary is the heart of a tube.

Recall the signals and slots of a node. We can declare a set of nodes and the connectivity between the nodes like this:

noob_id: docs-linear
noob_model: noob.tube.TubeSpecification
noob_version: 0.0.1

nodes:
  a:
    type: pkg.node_a
  b:
    type: pkg.node_b
    depends:
      - slot_a: a.value
  c:
    type: pkg.node_c
    depends:
      - slot_b: b.value

That looks like this:

The basic requirement of a node entry is the type specification, which is an absolute specifier of some node function or class within some python package in the environment. Throughout these examples, we’ll assume all code is written in the root of some python package named pkg

Dependencies

Connections between nodes are specified as dependencies (with the depends key) - a target node specifies the signals from a source node to plug into its slots. This lets tubes grow! A node knows what it needs to run, but it shouldn’t know anything about what other nodes do with its outputs, so new nodes that consume its outputs can be added freely.

Dependencies are typically specified as a list of dictionaries that map a signal from another node to one of its slots.

So

depends:
- slot_a: a.value 

maps the value output of node a to slot_a. Or, roughly,

node_b(slot_a=node_a())

Dependencies for positional-only arguments can be specified with a list of string dependencies, or list of dictionaries with integers for keys. The following are equivalent:

depends:
- a.value
- b.value
- c.value
depends:
- 0: a.value
- 1: b.value
- 2: c.value

Params

Params are static parameters that don’t change, and are part of the tube’s definition.

  • For function nodes, params effectively[3] work like a partial(): the parameters are always passed to the function when it is called.

  • For class nodes, params are passed to the class’s __init__ method.

  • For generator nodes, params are passed when the generator is created (generators cannot have dependencies)

So, for example, if one wanted to fix some behavior of a functional node, like to make the following multiply node become a “multiply by 2” node, one might specify one of its slots with a param and the other with a dependency:

def multiply(left: float, right: float) -> float:
    return left * right
nodes:
  int_source:
    type: itertools.count
  multiply_by_2:
    type: pkg.multiply
    depends:
      - left: int_source.value
    params:
      - right: 2

Input

Inputs are variable parameters that do change, either every time the tube is created or every time the tube is run.

Inputs have a scope (InputScope) that defines when they must be passed.

  • tube scoped inputs have the same lifespan as a tube: they are passed when creating the tube.

  • process scoped inputs have the same lifespan as a process() call: they are passed every time process is called.

Tube-scoped inputs

For example, a common need for pipelines is to run some operation over files within a directory.

Say we used some file iterator node like this:

from pathlib import Path
from typing import Generator

def iter_directory(path: Path) -> Generator[Path, None, None]:
    for file in path.iterdir():
        if file.is_file():
            yield file

Then then we could specify that operation as a tube that has a tube-scoped directory input like this:

noob_id: process-directory

input:
  directory:
    scope: tube
    type: pathlib.Path

nodes:
  file:
    type: pkg.iter_directory
    params:
      path: input.directory
  do_something:
    type: pkg.do_something
    depends:
      - a_file: file.value
  # ...other nodes...

that we then would pass when instantiating the tube from its specification like

tube = Tube.from_specification(
    'process-directory', 
    input = {"directory": Path("my_data")}
)
runner = SyncRunner(tube)

for result in runner.iter():
    # do something with the result...

Process-scoped inputs

If instead we wanted to vary some parameter every time we ran the tube, we could specify the input with a process scope.

So say we wanted to pass a new path manually, we could do

noob_id: process-directory-manual

input:
  path:
    scope: process
    type: pathlib.Path

which we would provide like

tube = Tube.from_specification('process-directory-manual')
runner = SyncRunner(tube)

for path in Path(my_data).iter_dir():
    result = runner.process(path=path)

Mixing Scopes, Depends, Params

The scopes of inputs can be crossed with the implicit scopes of depends and params:

  • tube-scoped inputs may be used for params, since params is a tube-level specification too:

    input:
      a:
       scope: tube
    
    nodes:
      b:
        params:
          c: input.a
    
  • process-scoped inputs may not be used as params, since they are not defined at the time params are used!

  • tube-scoped inputs and process-scoped inputs may be used as dependencies.

Additionally, a tube-scoped input may be overridden by an input to a process call - the most local scope prevails.

noob_id: mixed-scope
input:
  a:
    scope: tube

nodes:
  b:
    type: return
    depends: input.a
tube = Tube.from_specification('mixed-scope', input={'a': 'x'})
runner = SyncRunner(tube)
result = runner.process(a="y")

print(result)
# y

Assets

Tip

See the assets documentation

Returning Values

Tubes have inputs, and for whatever reason people usually like to “do things” with the result of a pipeline, so they also can return values.

The return node is a special node (see: SPECIAL_NODES) that collects events from a tube to be returned from a process call. Only one return node may be defined for a tube.

The return node uses the structure of its depends block to destructure the returned values:

For a scalar return value, specify depends as a single key/value pair

nodes:
  # ...
  return:
    type: return
    depends: a.value
runner.process()
# 0

For a list of returned values, specify depends as a list of signals

nodes:
  # ...
  return:
    type: return
    depends:
      - a.value
      - b.value
      - c.value
runner.process()
# [0, 1, 2]

For a dictionary of returned values, specify depends as a list of key/value pairs:

nodes:
  # ...
  no_broke_boys:
    type: return
    depends:
      - no: a.value
      - new: b.value
      - friends: c.value
runner.process()
# {"no": 0, "new": 1, "friends": 2}

Patterns

noob supports the basic patterns one would expect of a DAG processing library, including cardinality manipulation operations that are necessary for bridging arbitrary graph computation with practical programming conventions

  • merge - Combine signals from multiple dependant nodes in a single node

  • branch - Split a signal from a single node to the slots of several nodes

  • gather - Reduce cardinality: collect events emitted

Merge

Multiple signals can be merged as inputs to a single node’s slots:

noob_id: docs-merge
noob_model: noob.tube.TubeSpecification
noob_version: 0.0.1
nodes:
  a:
    type: noob.testing.count_source
  b:
    type: noob.testing.count_source
    params:
      start: 5
  c:
    type: noob.testing.multiply
    depends:
    - left: a.index
  d:
    type: noob.testing.divide
    depends:
    - numerator: c.value
    - denominator: b.index
  e:
    type: return
    depends: 
      - ratio: d.ratio

Branch

An event from a single signal can be branched an fed to multiple nodes:

noob_id: docs-branch
noob_model: noob.tube.TubeSpecification
noob_version: 0.0.1
nodes:
  a:
    type: noob.testing.count_source
  b:
    type: noob.testing.multiply
    depends:
      - left: a.index
  c:
    type: noob.testing.divide
    depends:
      - numerator: a.index
  d:
    type: return
    depends:
      - multiply: b.value
      - divide: c.ratio

Gather

Events from multiple rounds of calling the process method (or, epochs) can be gathered as input to another node in two ways. Both use the special gather node.

Gather n

With a fixed n value in the gather node’s params, the gather node collects n events from the depended-on node and then emits them as a list [e1, e2, ... e_n] to the node that depends on it

noob_id: docs-gather-n
noob_model: noob.tube.TubeSpecification
noob_version: 0.0.1
nodes:
  a:
    type: noob.testing.letter_source
  b:
    type: gather
    params:
      n: 5
    depends:
      - value: a.letter
  c:
    type: noob.testing.concat
    depends:
      - b.value
  d:
    type: return
    depends:
      - word: c.value

Where here the letter_source emits individual letters, the gather node collects 5 of them at a time, and the concat node joins them back together

The output of each node, per epoch, looks like:

epoch

a

b

c

0

“a”

1

“b”

2

“c”

3

“d”

4

“e”

['a', 'b', 'c', 'd', 'e']

'abcde'

Gather trigger

The gather node can also depend on another node’s output as a trigger, emitting the events it has collected since the last trigger

noob_id: docs-gather-dependent
noob_model: noob.tube.TubeSpecification
noob_version: 0.0.1
nodes:
  a1:
    type: noob.testing.count_source
  a2:
    type: noob.testing.sporadic_word
  b:
    type: gather
    depends:
      - value: a1.index
      - trigger: a2.word
  c:
    type: noob.testing.dictify
    depends:
      - key: a2.word
      - items: b.value
  d:
    type: return
    depends:
    - word: c.value

Where the gather node collects numbers from the a1 count source until the a2 “sporadic_word” node returns a value. The dictify node then converts the collected numbers into a dictionary using the value of the a2 word as a key,

so a set of runs might look like:

epoch

a1

a2

b

c

0

0

1

1

2

2

3

3

“electricity”

[0, 1, 2, 3]

{“electricity”: [0, 1, 2, 3]}

Map

Warning

Map has not been implemented yet!

It is a slightly more complicated problem becaue of some ambiguities that map-like specifications have that gather-like specifications don’t.

See: #61, #29

Map spreads a single, iterable event out, passing it multiple times to a given node within an epoch. This is useful for transforming data as well as for parallelization…

Nesting

Tubes have inputs and return values, so what is a tube but a node?

A special tube node allows tubes to be nested within one another:

Say you have a “child” tube like this

noob_id: docs-recursive-child
noob_model: noob.tube.TubeSpecification
noob_version: 0.0.1
input:
  child_start:
    scope: tube
    type: int
  child_multiply_inner:
    scope: process
    type: int
  child_multiply_input:
    scope: process
    type: int
nodes:
  a:
    type: noob.testing.count_source
    params:
      start: input.child_start
  b:
    type: noob.testing.multiply
    depends:
      - left: a.index
      - right: input.child_multiply_inner
  c:
    type: noob.testing.multiply
    depends:
    - left: b.value
    - right: input.child_multiply_input
  d:
    type: return
    depends:
    - value: c.value

You can include it in some “parent” tube like this:

noob_id: docs-recursive-parent
noob_model: noob.tube.TubeSpecification
noob_version: 0.0.1
input:
  parent_start:
    scope: tube
    type: int
  parent_multiply:
    scope: process
    type: int
  child_start:
    scope: tube
    type: int
  child_multiply:
    scope: process
    type: int
nodes:
  a:
    type: noob.testing.count_source
    params:
      start: input.parent_start

  b:
    type: tube
    params:
      tube: testing-recursive-child
    depends:
      - child_multiply_inner: a.index
      - child_multiply_input: input.child_multiply

  c:
    type: noob.testing.multiply
    depends:
    - left: b.value
    - right: input.parent_multiply
  d:
    type: return
    depends:
      - index: a.index
      - child: b.value
      - parent: c.value

Disabling Nodes

Todo

Document disabling nodes