Module datatap.tf

The tf module provides utilities for using dataTap with Tensorflow.

Please note that if you want to be able to use this module, you will either need to install Tensorflow manually, or install dataTap with the tensorflow extra:

pip install 'datatap[tf]'

This module exports two helper functions for creating tensorflow datasets. Here is an example of the single-process one, create_tf_dataset:

import itertools
from datatap import Api
from datatap.tf import create_dataset

api = Api()
dataset = api.get_default_database().get_dataset_list()[0]
latest_version = dataset.latest_version

dataset = create_dataset(latest_version, "training", num_workers = 4)
for (_image, bounding_boxes, labels) in itertools.islice(dataset, 3):
    print(bounding_boxes, labels)
Expand source code
"""
The `tf` module provides utilities for using dataTap with Tensorflow.

Please note that if you want to be able to use this module, you will
either need to install Tensorflow manually, or install dataTap with the
tensorflow extra:

```bash
pip install 'datatap[tf]'
```

This module exports two helper functions for creating tensorflow datasets.
Here is an example of the single-process one, `create_tf_dataset`:

```py
import itertools
from datatap import Api
from datatap.tf import create_dataset

api = Api()
dataset = api.get_default_database().get_dataset_list()[0]
latest_version = dataset.latest_version

dataset = create_dataset(latest_version, "training", num_workers = 4)
for (_image, bounding_boxes, labels) in itertools.islice(dataset, 3):
    print(bounding_boxes, labels)
```
"""

from .dataset import create_dataset, create_multi_worker_dataset

__all__ = [
    "create_dataset",
    "create_multi_worker_dataset",
]

Sub-modules

datatap.tf.dataset

Functions

def create_dataset(dataset: Dataset, split: str, input_class_mapping: Optional[Dict[str, int]] = None, num_workers: int = 24, input_context: Optional[tf.distribute.InputContext] = None)

Creates a tensorflow Dataset object that will load a specified split of version.

This function handles the necessary Dataset operations to parallelize the loading operation. Since image loading can be slow, it is recommended to have num_workers set to a value greater than 1. By default, it will try to load one image per CPU.

If you intend to use the dataset across multiple processes or computers, consider using create_tf_multi_worker_dataset instead.

Expand source code
def create_dataset(
        dataset: Dataset,
        split: str,
        input_class_mapping: Optional[Dict[str, int]] = None,
        num_workers: int = cpu_count() or 1,
        input_context: Optional[tf.distribute.InputContext] = None
):
    """
    Creates a tensorflow `Dataset` object that will load a specified split of `version`.

    This function handles the necessary `Dataset` operations to parallelize the loading
    operation. Since image loading can be slow, it is recommended to have `num_workers`
    set to a value greater than 1. By default, it will try to load one image per CPU.

    If you intend to use the dataset across multiple processes or computers, consider
    using `create_tf_multi_worker_dataset` instead.
    """
    class_mapping = _get_class_mapping(dataset, input_class_mapping)

    def gen():
        worker_id = input_context.input_pipeline_id if input_context is not None else 0
        num_workers = input_context.num_input_pipelines if input_context is not None else 1

        for droplet in dataset.stream_split(split, worker_id, num_workers):
            image_url = tf.constant(droplet.image.paths[0])

            bounding_boxes = tf.stack([
                tf.constant(i.bounding_box.rectangle.to_xywh_tuple(), shape=(4,), dtype=tf.float64)
                for cls in droplet.classes.keys()
                for i in droplet.classes[cls].instances
                if i.bounding_box is not None
            ])

            labels = tf.stack([
                tf.constant(class_mapping[cls], dtype=tf.int32)
                for cls in droplet.classes.keys()
                for _ in droplet.classes[cls].instances
            ])

            yield (image_url, bounding_boxes, labels)

    def _load_img_fn(image_url: tf.Tensor):
        res = requests.get(image_url.numpy().decode("ascii"))
        img = tf.io.decode_jpeg(res.content, channels=3)
        return img

    def load_img_fn(image_url: tf.Tensor):
        return tf.py_function(_load_img_fn, inp=(image_url,), Tout=(tf.uint8,))

    def map_fn(image_url: tf.Tensor, boxes: tf.Tensor, labels: tf.Tensor):
        return (load_img_fn(image_url), boxes, labels)

    ds = tf.data.Dataset.from_generator(
        gen,
        (tf.string, tf.float64, tf.int32),
        (tf.TensorShape(()), tf.TensorShape((None, 4)), (tf.TensorShape((None))))
    )

    return ds.map(map_fn, num_parallel_calls=num_workers)
def create_multi_worker_dataset(strategy: tf.distribute.experimental.Strategy, dataset: Dataset, split: str, num_workers: int = 24, input_class_mapping: Optional[Dict[str, int]] = None)

Creates a multi-worker sharded dataset. In addition to sharding the contents of the dataset across multiple machines, this function will also attempt to load the images across several workers.

If you are running multiple workers on the same physical machine, consider lowering the value of num_workers, as by default each worker will try to use every CPU on the machine.

Expand source code
def create_multi_worker_dataset(
    strategy: tf.distribute.experimental.Strategy,
    dataset: Dataset,
    split: str,
    num_workers: int = cpu_count() or 1,
    input_class_mapping: Optional[Dict[str, int]] = None,
):
    """
    Creates a multi-worker sharded dataset. In addition to sharding the contents
    of the dataset across multiple machines, this function will also attempt to
    load the images across several workers.

    If you are running multiple workers on the same physical machine, consider lowering
    the value of `num_workers`, as by default each worker will try to use every CPU
    on the machine.
    """
    ds = strategy.experimental_distribute_datasets_from_function(
        functools.partial(create_dataset, dataset, split, num_workers, input_class_mapping)
    )
    return ds