import asyncio
import random
import string
from collections.abc import Generator, Iterator
from datetime import datetime
from itertools import count, cycle
from time import sleep
from typing import Annotated as A
from typing import Any
from faker import Faker
from noob import Name, process_method
from noob.node import Node
[docs]
def count_source(
limit: int = 10000, start: int = 0
) -> Generator[A[int, Name("index")], None, None]:
counter = count(start=start)
if limit == 0:
while True:
yield next(counter)
else:
while (val := next(counter)) < limit:
yield val
[docs]
def letter_source() -> Generator[A[str, Name("letter")]]:
yield from cycle(string.ascii_lowercase)
[docs]
def word_source() -> Generator[A[str, Name("word")]]:
fake = Faker()
while True:
yield fake.word()
[docs]
def multi_words_source(n: int) -> Generator[A[list[str], Name("multi_words")]]:
fake = Faker()
while True:
yield [fake.unique.word() for _ in range(n)]
[docs]
def sporadic_word(every: int = 3) -> Generator[A[str, Name("word")] | None, None, None]:
fake = Faker()
i = 0
while True:
i += 1
if i % every == 0:
yield fake.word()
else:
yield None
[docs]
def word_counts() -> Generator[tuple[A[str, Name("word")], A[list[int], Name("counts")]]]:
fake = Faker()
while True:
n_counts = random.randint(2, 5)
yield fake.unique.word(), [random.randint(1, 100) for _ in range(n_counts)]
[docs]
def multiply(left: int, right: int = 2) -> int:
"""
Return value purposely unnamed,
to be used as `{nodename}.value`
"""
return left * right
[docs]
def divide(numerator: int, denominator: int = 5) -> A[float, Name("ratio")]:
return numerator / denominator
[docs]
def concat(strings: list[str]) -> str:
return "".join(strings)
[docs]
def exclaim(string: str) -> str:
return string + "!"
[docs]
def repeat(string: str, times: int) -> str:
return string * times
[docs]
def dictify(key: str, items: list[Any]) -> dict[str, Any]:
return {key: items}
[docs]
def error(value: Any) -> None:
raise ValueError("This node just emits errors")
[docs]
class CountSource(Node):
limit: int = 1000
start: int = 0
[docs]
def process(self) -> Generator[A[int, Name("index")], None, None]:
counter = count(start=self.start)
while (val := next(counter)) < self.limit:
yield val
[docs]
class UnannotatedGenerator(Node):
limit: int = 1000
start: int = 0
[docs]
def process(self): # noqa: ANN201
counter = count(start=self.start)
while (val := next(counter)) < self.limit:
yield val
[docs]
class Multiply(Node):
[docs]
def process(self, left: int, right: int = 2) -> A[int, Name("product")]:
return multiply(left=left, right=right)
[docs]
class VolumeProcess:
def __init__(self, height: int = 2):
self.height = height
[docs]
def process(self, width: int, depth: int) -> A[int, Name("volume")]:
return self.height * multiply(left=width, right=depth)
[docs]
class Volume:
def __init__(self, height: int = 2):
self.height = height
[docs]
@process_method
def volume(self, width: int, depth: int) -> A[int, Name("volume")]:
return self.height * multiply(left=width, right=depth)
[docs]
class Now:
def __init__(self):
self.now = datetime.now()
[docs]
@process_method
def print(self, prefix: str = "Now: ") -> A[str, Name("timestamp")]:
return f"{prefix}{self.now.isoformat()}"
[docs]
class CountSourceDecor:
def __init__(self, start: int = 0) -> None:
self.gen = count(start=start)
[docs]
@process_method
def process(self) -> Generator[A[int, Name("count")], None, None]:
yield from self.gen
[docs]
def long_add(value: float, sleep_for: float = 0.25) -> float:
sleep(sleep_for)
return value + 1
[docs]
async def number_to_letter(number: int, offset: int = 0) -> str:
sleep_for = random.random() / 10
await asyncio.sleep(sleep_for)
return string.ascii_lowercase[(number + offset) % len(string.ascii_lowercase)]
[docs]
class NumberToLetterCls:
def __init__(self, offset: int = 0):
self.offset = offset
[docs]
async def process(self, number: int) -> str:
sleep_for = random.random() / 10
await asyncio.sleep(sleep_for)
return string.ascii_lowercase[(number + self.offset) % len(string.ascii_lowercase)]
[docs]
async def async_error(value: Any) -> None:
"""Just raise an error!"""
raise ValueError("This is the error that should be raised")
[docs]
class StatefulMultiply:
def __init__(self, start: int = 0) -> None:
self.start = start
self.current = self.start
[docs]
def process(self, left: float, right: float = 1) -> float:
value = left * right * self.current
self.current += 1
return value
[docs]
def fast_forward(generator: count, n: int = 1) -> tuple[A[int, Name("next")]]:
for _ in range(n):
val = next(generator)
return val
[docs]
def jump(generator: count, n: int = 1) -> tuple[A[count, Name("skirttt")], A[int, Name("next")]]:
value = 0
for _ in range(n):
value = next(generator)
return generator, value
[docs]
def rewind(generator: count, n: int = 1) -> A[count, Name("skrittt")]:
"""Purposely designed to mutate the input and return a new object"""
return count(next(generator) - n)
[docs]
def zip_iter(*args: Iterator) -> tuple[Any, ...]:
return tuple(next(a) for a in args)
[docs]
def increment(
iterator: Iterator[int],
) -> tuple[A[Iterator[int], Name("iterator")], A[int, Name("value")]]:
value = next(iterator)
return iterator, value
[docs]
def passthrough(value: Any) -> Any:
return value
[docs]
class InitCounter(Node):
"""Count how many times we have been initialized and deinitalized"""
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self._inits = 0
self._deinits = 0
[docs]
def process(self) -> tuple[A[int, Name("inits")], A[int, Name("deinits")]]:
# sleep to just not have this flood the networking modules.
sleep(0.01)
return self._inits, self._deinits
[docs]
def init(self) -> None:
self._inits += 1
[docs]
def deinit(self) -> None:
self._deinits += 1