Module datatap.utils
A collection of primarily internal-use utilities.
Expand source code
"""
A collection of primarily internal-use utilities.
"""
from .environment import Environment
from .helpers import assert_one, timer, DeletableGenerator
from .cache_generator import CacheGenerator
from .or_nullish import OrNullish
from .print_helpers import basic_repr, color_repr, force_pretty_print, pprint, pprints
__all__ = [
"Environment",
"assert_one",
"timer",
"DeletableGenerator",
"CacheGenerator",
"OrNullish",
"basic_repr",
"color_repr",
"force_pretty_print",
"pprint",
"pprints"
]
Sub-modules
datatap.utils.cache_generator
datatap.utils.environment
datatap.utils.helpers
datatap.utils.or_nullish
datatap.utils.print_helpers
Functions
def CacheGenerator(file_name: str, create_stream: Callable[[], Generator[_T, Any, Any]]) ‑> Generator[~_T, None, None]
-
Expand source code
def CacheGenerator(file_name: str, create_stream: Callable[[], Generator[_T, Any, Any]]) -> Generator[_T, None, None]: # We can't just naively stream from the server, unfortunately. Due to the sheer # volume of data, and the fact that training can be such a slow process, if we # try to stream the data directly from server to training process, we will end # up filling the OS' buffer, causing significant backpressure for the server. # # Likewise, we cannot necessarily stream from the server into a local buffer, # as a large dataset could be greater than our available RAM. # # As a result, this method streams data from the server to a temp file in a # subprocess. The main process then streams from that tempfile to the consumer # of the stream. Finally, once all data has been read, the main process stores # the stream file as an authoritative cache file for this particular stream. # Subsequent calls to this function with the same arguments will then pull from # that file. # # Please note that as a result of file-system accesses, streaming in this manner # incurs a non-trivial performance cost. For production training jobs, it is # recommended that this function be used with a data-loader capable of running # on multiple threads. # TODO(zwade): change this to UID once we have an endpoint for fetching it dir_name = path.dirname(file_name) tmp_file_name = f"{file_name}.stream" os.makedirs(dir_name, exist_ok=True) EOF = "EOF" # Checks for an authoritative cache, using it if it exists. if path.exists(file_name): def cache_generator(): with open(file_name, "r") as f: for line in f.readlines(): line = line.strip() if line == "" or line == EOF: continue yield json.loads(line) return return cache_generator() # `sync_queue` is used to synchronize startup and termination of the # subprocess, optionally propagating any errors that arise. sync_queue: Queue[Optional[Exception]] = Queue() # `available_annotations` counts how many lines have been written to # the stream file that have not yet been consumed. available_annotations = Semaphore() # `dead` is a flag that allows us to terminate our stream early dead = False def stream_target(): stream = create_stream() with open(tmp_file_name, "a+") as f: sync_queue.put(None) try: for element in stream: if dead: raise Exception("Premature termination") # We want to prioritize reading quickly, so after we write, we # flush to the disk. # # (Note that we do not synchronize, as `fsync` incurs a 10x # slowdown) f.write(json.dumps(element) + "\n") f.flush() # We then "release" our semaphore to indicate that we've made a # new asset available to the consumer available_annotations.release() sync_queue.put(None) except Exception as e: sync_queue.put(e) finally: # We explicitly write "EOF" at the end of the stream, since we # otherwise would not be able to distinguish between the actual # EOF and an incomplete write. f.write(EOF + "\n") f.flush() available_annotations.release() thread = Thread(target = stream_target) thread.start() def generator(): sync_queue.get() with open(tmp_file_name, "r") as f: while True: available_annotations.acquire() line = "" c = 0 while line == "" or line[-1] != "\n": # Busy loop to wait for the file write. # # If we're eagerly fetching a large portion of the stream # we may become bottlenecked by file synchronization. In # this case, we implement a simple backoff to avoid # unnecessarily hammering the file system. line += f.readline() c += 1 if c > 10: time.sleep(0.005) data = line.strip() if data == EOF: break yield json.loads(data) thread.join() error = sync_queue.get() if error is not None: # This error came from the data loading subprocess raise error def stop_processing(): # This is a rather gross way of killing it, but unlike `Process`, `Thread` # has no `terminate` method. nonlocal dead dead = True return DeletableGenerator(generator(), stop_processing)
def assert_one(item_list: List[~_T]) ‑> ~_T
-
Given a list of items, asserts that the list is a singleton, and returns its value.
Expand source code
def assert_one(item_list: List[_T]) -> _T: """ Given a list of items, asserts that the list is a singleton, and returns its value. """ if len(item_list) != 1: raise AssertionError(f"Expected one item in list, but found {len(item_list)}", item_list) return item_list[0]
def basic_repr(class_name: str, *args: Any, **kwargs: Any) ‑> str
-
A function to be used for defining a class's
__repr__
method. When possible, will pretty-print the object in a way that is both easy to read, and useful for testing.from datatap.utils import basic_repr class Person: name: string age: int height: int def __repr__(self): return basic_repr("Person", name, age = age, height = height)
Expand source code
def basic_repr(class_name: str, *args: Any, **kwargs: Any) -> str: """ A function to be used for defining a class's `__repr__` method. When possible, will pretty-print the object in a way that is both easy to read, and useful for testing. ```py from datatap.utils import basic_repr class Person: name: string age: int height: int def __repr__(self): return basic_repr("Person", name, age = age, height = height) ``` """ if not IS_INTERACTIVE and not pretty_print: positional_properties = [repr(value) for value in args] named_properties = [f"{key} = {repr(value)}" for key, value in kwargs.items() if value is not None] properties = ", ".join(positional_properties + named_properties) return f"{class_name}({properties})" else: positional_properties = [ f"{_ansi['green']}{color_repr(value)}{_ansi['clear']}" for value in args ] named_properties = [ f"{_ansi['red']}{key} {_ansi['purple']}= {color_repr(value)}" for key, value in kwargs.items() if value is not None ] properties = f"{_ansi['cyan']},{_ansi['clear']} ".join(positional_properties + named_properties) return f"{_ansi['yellow']}{class_name}{_ansi['cyan']}({_ansi['clear']}{properties}{_ansi['cyan']}){_ansi['clear']}"
def color_repr(entity: Any) ‑> str
-
A dynamic pretty-printer that will syntax highlight different python entities.
Rarely used on its own, see
basic_repr()
.Expand source code
def color_repr(entity: Any) -> str: """ A dynamic pretty-printer that will syntax highlight different python entities. Rarely used on its own, see `datatap.utils.basic_repr`. """ if entity is None: return f"{_ansi['orange']}None{_ansi['clear']}" if isinstance(entity, str): return f"{_ansi['cyan']}\"{_ansi['green']}{entity}{_ansi['clear']}{_ansi['cyan']}\"{_ansi['clear']}" if isinstance(entity, (int, float)): return f"{_ansi['orange']}{entity}{_ansi['clear']}" if isinstance(entity, (list, tuple)): entity_list = cast(Union[List[Any], Tuple[Any]], entity) return ( f"{_ansi['cyan']}{'[' if type(entity_list) == list else '('}" + f"{_ansi['cyan']},{_ansi['clear']} ".join([color_repr(e) for e in entity_list]) + f"{_ansi['cyan']}{']' if type(entity_list) == list else ')'}" ) if isinstance(entity, dict): entity_dict = cast(Dict[Any, Any], entity) return ( f"{_ansi['cyan']}{{" + f"{_ansi['cyan']},{_ansi['clear']} ".join([ f"{color_repr(key)}{_ansi['cyan']}: {color_repr(value)}" for key, value in entity_dict.items() ]) + f"{_ansi['cyan']}}}" ) return repr(entity)
def force_pretty_print()
-
By default, this library only uses pretty-printing when it's in an interactive environment (terminal, python shell, etc.). However, there are a few cases when pretty-printing is desired in a non-interactive environment, such as when running under Jupyter. Calling this function once will ensure all future prints will be pretty.
Expand source code
def force_pretty_print(): """ By default, this library only uses pretty-printing when it's in an interactive environment (terminal, python shell, etc.). However, there are a few cases when pretty-printing is desired in a non-interactive environment, such as when running under Jupyter. Calling this function once will ensure all future prints will be pretty. """ global pretty_print pretty_print = True
def pprint(fmt: str, *args: Any, print_args: Dict[str, Any] = {}, **kwargs: Any) ‑> None
-
Pretty printer. The first argument is a format string, and the remaining arguments are the values for the string. Additionally, the format string can access a number of ansi escape codes such as colors,
clear
,prev
, andstart
.pprint("{prev}Progress: {orange}{i}{clear}/{total}, i=i, total=total)
Expand source code
def pprint(fmt: str, *args: Any, print_args: Dict[str, Any] = {}, **kwargs: Any) -> None: """ Pretty printer. The first argument is a format string, and the remaining arguments are the values for the string. Additionally, the format string can access a number of ansi escape codes such as colors, `clear`, `prev`, and `start`. ```py pprint("{prev}Progress: {orange}{i}{clear}/{total}, i=i, total=total) ``` """ print((fmt + "{clear}").format(*args, **{**kwargs, **_ansi}), **print_args) sys.stdout.flush()
def pprints(fmt: str, *args: Any, **kwargs: Any) ‑> str
-
Pretty prints to a string.
See
pprint()
.Expand source code
def pprints(fmt: str, *args: Any, **kwargs: Any) -> str: """ Pretty prints to a string. See `datatap.utils.pprint`. """ return (fmt + "{clear}").format(*args, **{**kwargs, **_ansi})
def timer(name: str)
-
Expand source code
@contextmanager def timer(name: str): start = time.time() yield None end = time.time() value = end - start avg, count = _timer_state.get(name, (0.0, 0)) count += 1 avg += (value - avg) / count _timer_state[name] = (avg, count) pprint( "{blue}{name} took {yellow}{value:1.3f}s{blue} for an average of {yellow}{avg:1.3f}s", name = name, value = value, avg = avg, )
Classes
class DeletableGenerator (gen: Generator[~_T, ~_U, ~_V], delete_thunk: Callable[[], None])
-
A deletable generator wraps an existing generator with a deletion function to allow cleanup.
Expand source code
class DeletableGenerator(Generator[_T, _U, _V]): """ A deletable generator wraps an existing generator with a deletion function to allow cleanup. """ _gen: Generator[_T, _U, _V] _delete: Callable[[], None] def __init__(self, gen: Generator[_T, _U, _V], delete_thunk: Callable[[], None]): self._gen = gen self._delete = delete_thunk def __next__(self): return next(self._gen) def send(self, value: _U): return self._gen.send(value) def throw(self, excn: BaseException, val: None, tb: Optional[TracebackType]): return self._gen.throw(excn, val, tb) def __del__(self): self._delete() pass
Ancestors
- collections.abc.Generator
- collections.abc.Iterator
- collections.abc.Iterable
- typing.Generic
Methods
def send(self, value: ~_U)
-
Send a value into the generator. Return next yielded value or raise StopIteration.
Expand source code
def send(self, value: _U): return self._gen.send(value)
def throw(self, excn: BaseException, val: None, tb: Optional[traceback])
-
Raise an exception in the generator. Return next yielded value or raise StopIteration.
Expand source code
def throw(self, excn: BaseException, val: None, tb: Optional[TracebackType]): return self._gen.throw(excn, val, tb)
class Environment
-
A class providing static access to parameters related to the execution environment of the module.
Expand source code
class Environment: """ A class providing static access to parameters related to the execution environment of the module. """ API_KEY = os.getenv("DATATAP_API_KEY") """ The default API key used for API calls. """ BASE_URI = os.getenv("DATATAP_BASE_URI", "https://app.datatap.dev") """ The base URI used for referencing the dataTap application, e.g. for API calls. One might change this to use an HTTP proxy, for example. """
Class variables
var API_KEY
-
The default API key used for API calls.
var BASE_URI
-
The base URI used for referencing the dataTap application, e.g. for API calls. One might change this to use an HTTP proxy, for example.
class OrNullish
-
A helper class to represent the monad
α OrNullish = α | None
.Expand source code
class OrNullish: """ A helper class to represent the monad `α OrNullish = α | None`. """ @staticmethod def bind(val: Optional[_T], fn: Callable[[_T], Optional[_S]]) -> Optional[_S]: """ Monadically binds `fn` to the value of `val`. """ if val is None: return None else: return fn(val)
Static methods
def bind(val: Optional[~_T], fn: Callable[[~_T], Optional[~_S]]) ‑> Optional[~_S]
-
Monadically binds
fn
to the value ofval
.Expand source code
@staticmethod def bind(val: Optional[_T], fn: Callable[[_T], Optional[_S]]) -> Optional[_S]: """ Monadically binds `fn` to the value of `val`. """ if val is None: return None else: return fn(val)