Module datatap.utils

A collection of primarily internal-use utilities.

Expand source code
"""
A collection of primarily internal-use utilities.
"""

from .environment import Environment
from .helpers import assert_one, timer, DeletableGenerator
from .cache_generator import CacheGenerator
from .or_nullish import OrNullish
from .print_helpers import basic_repr, color_repr, force_pretty_print, pprint, pprints

__all__ = [
        "Environment",
        "assert_one",
        "timer",
        "DeletableGenerator",
        "CacheGenerator",
        "OrNullish",
        "basic_repr",
        "color_repr",
        "force_pretty_print",
        "pprint",
        "pprints"
]

Sub-modules

datatap.utils.cache_generator
datatap.utils.environment
datatap.utils.helpers
datatap.utils.or_nullish
datatap.utils.print_helpers

Functions

def CacheGenerator(file_name: str, create_stream: Callable[[], Generator[_T, Any, Any]]) ‑> Generator[~_T, NoneType, NoneType]
Expand source code
def CacheGenerator(file_name: str, create_stream: Callable[[], Generator[_T, Any, Any]]) -> Generator[_T, None, None]:
    # We can't just naively stream from the server, unfortunately. Due to the sheer
    # volume of data, and the fact that training can be such a slow process, if we
    # try to stream the data directly from server to training process, we will end
    # up filling the OS' buffer, causing significant backpressure for the server.
    #
    # Likewise, we cannot necessarily stream from the server into a local buffer,
    # as a large dataset could be greater than our available RAM.
    #
    # As a result, this method streams data from the server to a temp file in a
    # subprocess. The main process then streams from that tempfile to the consumer
    # of the stream. Finally, once all data has been read, the main process stores
    # the stream file as an authoritative cache file for this particular stream.
    # Subsequent calls to this function with the same arguments will then pull from
    # that file.
    #
    # Please note that as a result of file-system accesses, streaming in this manner
    # incurs a non-trivial performance cost. For production training jobs, it is
    # recommended that this function be used with a data-loader capable of running
    # on multiple threads.

    # TODO(zwade): change this to UID once we have an endpoint for fetching it
    dir_name = path.dirname(file_name)
    tmp_file_name = f"{file_name}.stream"
    os.makedirs(dir_name, exist_ok=True)

    EOF = "EOF"

    # Checks for an authoritative cache, using it if it exists.
    if path.exists(file_name):
        def cache_generator():
            with open(file_name, "r") as f:
                for line in f.readlines():
                    line = line.strip()
                    if line == "" or line == EOF:
                        continue
                    yield json.loads(line)
            return
        return cache_generator()


    # `sync_queue` is used to synchronize startup and termination of the
    # subprocess, optionally propagating any errors that arise.
    sync_queue: Queue[Optional[Exception]] = Queue()

    # `available_annotations` counts how many lines have been written to
    # the stream file that have not yet been consumed.
    available_annotations = Semaphore()

    # `dead` is a flag that allows us to terminate our stream early
    dead = False

    def stream_target():
        stream = create_stream()

        with open(tmp_file_name, "a+") as f:
            sync_queue.put(None)
            try:
                for element in stream:
                    if dead:
                        raise Exception("Premature termination")

                    # We want to prioritize reading quickly, so after we write, we
                    # flush to the disk.
                    #
                    # (Note that we do not synchronize, as `fsync` incurs a 10x
                    # slowdown)
                    f.write(json.dumps(element) + "\n")
                    f.flush()
                    # We then "release" our semaphore to indicate that we've made a
                    # new asset available to the consumer
                    available_annotations.release()

                sync_queue.put(None)
            except Exception as e:
                sync_queue.put(e)
            finally:
                # We explicitly write "EOF" at the end of the stream, since we
                # otherwise would not be able to distinguish between the actual
                # EOF and an incomplete write.
                f.write(EOF + "\n")
                f.flush()
                available_annotations.release()

    thread = Thread(target = stream_target)
    thread.start()

    def generator():
        sync_queue.get()
        with open(tmp_file_name, "r") as f:
            while True:
                available_annotations.acquire()

                line = ""
                c = 0
                while line == "" or line[-1] != "\n":
                    # Busy loop to wait for the file write.
                    #
                    # If we're eagerly fetching a large portion of the stream
                    # we may become bottlenecked by file synchronization. In
                    # this case, we implement a simple backoff to avoid
                    # unnecessarily hammering the file system.
                    line += f.readline()
                    c += 1
                    if c > 10:
                        time.sleep(0.005)

                data = line.strip()
                if data == EOF:
                    break

                yield json.loads(data)

            thread.join()

            error = sync_queue.get()
            if error is not None:
                # This error came from the data loading subprocess
                raise error

    def stop_processing():
        # This is a rather gross way of killing it, but unlike `Process`, `Thread`
        # has no `terminate` method.
        nonlocal dead
        dead = True

    return DeletableGenerator(generator(), stop_processing)
def assert_one(item_list: List[~_T]) ‑> ~_T

Given a list of items, asserts that the list is a singleton, and returns its value.

Expand source code
def assert_one(item_list: List[_T]) -> _T:
    """
    Given a list of items, asserts that the list is a singleton,
    and returns its value.
    """
    if len(item_list) != 1:
        raise AssertionError(f"Expected one item in list, but found {len(item_list)}", item_list)

    return item_list[0]
def basic_repr(class_name: str, *args: Any, **kwargs: Any) ‑> str

A function to be used for defining a class's __repr__ method. When possible, will pretty-print the object in a way that is both easy to read, and useful for testing.

from datatap.utils import basic_repr

class Person:
    name: string
    age: int
    height: int

    def __repr__(self):
        return basic_repr("Person", name, age = age, height = height)
Expand source code
def basic_repr(class_name: str, *args: Any, **kwargs: Any) -> str:
    """
    A function to be used for defining a class's `__repr__` method.
    When possible, will pretty-print the object in a way that is both easy
    to read, and useful for testing.

    ```py
    from datatap.utils import basic_repr

    class Person:
        name: string
        age: int
        height: int

        def __repr__(self):
            return basic_repr("Person", name, age = age, height = height)
    ```
    """
    if not IS_INTERACTIVE and not pretty_print:
        positional_properties = [repr(value) for value in args]
        named_properties = [f"{key} = {repr(value)}" for key, value in kwargs.items() if value is not None]
        properties = ", ".join(positional_properties + named_properties)
        return f"{class_name}({properties})"
    else:
        positional_properties = [
            f"{_ansi['green']}{color_repr(value)}{_ansi['clear']}"
            for value in args
        ]
        named_properties = [
            f"{_ansi['red']}{key} {_ansi['purple']}= {color_repr(value)}"
            for key, value in kwargs.items()
            if value is not None
        ]
        properties = f"{_ansi['cyan']},{_ansi['clear']} ".join(positional_properties + named_properties)
        return f"{_ansi['yellow']}{class_name}{_ansi['cyan']}({_ansi['clear']}{properties}{_ansi['cyan']}){_ansi['clear']}"
def color_repr(entity: Any) ‑> str

A dynamic pretty-printer that will syntax highlight different python entities.

Rarely used on its own, see basic_repr().

Expand source code
def color_repr(entity: Any) -> str:
    """
    A dynamic pretty-printer that will syntax highlight different python
    entities.

    Rarely used on its own, see `datatap.utils.basic_repr`.
    """
    if entity is None:
        return f"{_ansi['orange']}None{_ansi['clear']}"
    if isinstance(entity, str):
        return f"{_ansi['cyan']}\"{_ansi['green']}{entity}{_ansi['clear']}{_ansi['cyan']}\"{_ansi['clear']}"
    if isinstance(entity, (int, float)):
        return f"{_ansi['orange']}{entity}{_ansi['clear']}"
    if isinstance(entity, (list, tuple)):
        entity_list = cast(Union[List[Any], Tuple[Any]], entity)
        return (
            f"{_ansi['cyan']}{'[' if type(entity_list) == list else '('}" +
                f"{_ansi['cyan']},{_ansi['clear']} ".join([color_repr(e) for e in entity_list]) +
            f"{_ansi['cyan']}{']' if type(entity_list) == list else ')'}"
        )
    if isinstance(entity, dict):
        entity_dict = cast(Dict[Any, Any], entity)
        return (
            f"{_ansi['cyan']}{{" +
                f"{_ansi['cyan']},{_ansi['clear']} ".join([
                    f"{color_repr(key)}{_ansi['cyan']}: {color_repr(value)}"
                    for key, value in entity_dict.items()
                ]) +
            f"{_ansi['cyan']}}}"
        )
    return repr(entity)
def force_pretty_print()

By default, this library only uses pretty-printing when it's in an interactive environment (terminal, python shell, etc.). However, there are a few cases when pretty-printing is desired in a non-interactive environment, such as when running under Jupyter. Calling this function once will ensure all future prints will be pretty.

Expand source code
def force_pretty_print():
    """
    By default, this library only uses pretty-printing when it's in an
    interactive environment (terminal, python shell, etc.). However, there are a
    few cases when pretty-printing is desired in a non-interactive environment,
    such as when running under Jupyter. Calling this function once will ensure
    all future prints will be pretty.
    """
    global pretty_print
    pretty_print = True
def pprint(fmt: str, *args: Any, print_args: Dict[str, Any] = {}, **kwargs: Any) ‑> NoneType

Pretty printer. The first argument is a format string, and the remaining arguments are the values for the string. Additionally, the format string can access a number of ansi escape codes such as colors, clear, prev, and start.

pprint("{prev}Progress: {orange}{i}{clear}/{total}, i=i, total=total)
Expand source code
def pprint(fmt: str, *args: Any, print_args: Dict[str, Any] = {}, **kwargs: Any) -> None:
    """
    Pretty printer. The first argument is a format string, and the remaining
    arguments are the values for the string. Additionally, the format string
    can access a number of ansi escape codes such as colors, `clear`, `prev`,
    and `start`.

    ```py
    pprint("{prev}Progress: {orange}{i}{clear}/{total}, i=i, total=total)
    ```
    """
    print((fmt + "{clear}").format(*args, **{**kwargs, **_ansi}), **print_args)
    sys.stdout.flush()
def pprints(fmt: str, *args: Any, **kwargs: Any) ‑> str

Pretty prints to a string.

See pprint().

Expand source code
def pprints(fmt: str, *args: Any, **kwargs: Any) -> str:
    """
    Pretty prints to a string.

    See `datatap.utils.pprint`.
    """
    return (fmt + "{clear}").format(*args, **{**kwargs, **_ansi})
def timer(name: str)
Expand source code
@contextmanager
def timer(name: str):
    start = time.time()
    yield None
    end = time.time()

    value = end - start
    avg, count = _timer_state.get(name, (0.0, 0))
    count += 1
    avg += (value - avg) / count
    _timer_state[name] = (avg, count)

    pprint(
        "{blue}{name} took {yellow}{value:1.3f}s{blue} for an average of {yellow}{avg:1.3f}s",
        name = name,
        value = value,
        avg = avg,
    )

Classes

class DeletableGenerator (gen: Generator[~_T, ~_U, ~_V], delete_thunk: Callable[[], NoneType])

A deletable generator wraps an existing generator with a deletion function to allow cleanup.

Expand source code
class DeletableGenerator(Generator[_T, _U, _V]):
    """
    A deletable generator wraps an existing generator with a deletion
    function to allow cleanup.
    """

    _gen: Generator[_T, _U, _V]
    _delete: Callable[[], None]

    def __init__(self, gen: Generator[_T, _U, _V], delete_thunk: Callable[[], None]):
        self._gen = gen
        self._delete = delete_thunk

    def __next__(self):
        return next(self._gen)

    def send(self, value: _U):
        return self._gen.send(value)

    def throw(self, excn: BaseException, val: None, tb: Optional[TracebackType]):
        return self._gen.throw(excn, val, tb)

    def __del__(self):
        self._delete()
        pass

Ancestors

  • collections.abc.Generator
  • collections.abc.Iterator
  • collections.abc.Iterable
  • typing.Generic

Methods

def send(self, value: ~_U)

Send a value into the generator. Return next yielded value or raise StopIteration.

Expand source code
def send(self, value: _U):
    return self._gen.send(value)
def throw(self, excn: BaseException, val: None, tb: Union[traceback, NoneType])

Raise an exception in the generator. Return next yielded value or raise StopIteration.

Expand source code
def throw(self, excn: BaseException, val: None, tb: Optional[TracebackType]):
    return self._gen.throw(excn, val, tb)
class Environment

A class providing static access to parameters related to the execution environment of the module.

Expand source code
class Environment:
        """
        A class providing static access to parameters related to the execution
        environment of the module.
        """

        API_KEY = os.getenv("DATATAP_API_KEY")
        """
        The default API key used for API calls.
        """

        BASE_URI = os.getenv("DATATAP_BASE_URI", "https://app.datatap.dev")
        """
        The base URI used for referencing the dataTap application, e.g. for API
        calls. One might change this to use an HTTP proxy, for example.
        """

Class variables

var API_KEY

The default API key used for API calls.

var BASE_URI

The base URI used for referencing the dataTap application, e.g. for API calls. One might change this to use an HTTP proxy, for example.

class OrNullish

A helper class to represent the monad α OrNullish = α | None.

Expand source code
class OrNullish:
    """
    A helper class to represent the monad `α OrNullish = α | None`.
    """

    @staticmethod
    def bind(val: Optional[_T], fn: Callable[[_T], Optional[_S]]) -> Optional[_S]:
        """
        Monadically binds `fn` to the value of `val`.
        """
        if val is None:
            return None
        else:
            return fn(val)

Static methods

def bind(val: Union[~_T, NoneType], fn: Callable[[~_T], Union[~_S, NoneType]]) ‑> Union[~_S, NoneType]

Monadically binds fn to the value of val.

Expand source code
@staticmethod
def bind(val: Optional[_T], fn: Callable[[_T], Optional[_S]]) -> Optional[_S]:
    """
    Monadically binds `fn` to the value of `val`.
    """
    if val is None:
        return None
    else:
        return fn(val)