import ruamel.yaml
import logging
import os
import os.path as path
import requests
import re
import glob
from subprocess import call
import emloop as el
[docs]class IteraitDataset(el.BaseDataset):
"""
WARNING: must be adapted to Andy.
Base Iterait dataset providing methods for:
- downloading annotations from Andy
- creating symlinks to data @eric
- building **hipipe** datasets
All Iterait dataset configs must contain ``data_root``.
Available options to be configured are:
- ``task_ids``: a list of Andy task IDs to download the annotations for
- ``dataset_ids``: a list of Andy dataset IDs to make symlinks to
- ``hipipe_dirs``: a list of directories with hipipe CMake files to be build
- ``hipipe_build_type``: **hipipe** build type (optional, defaults to ``Debug``)
- ``annotator_url``: Andy API url (optional, defaults to ``https://andy.iterait.com``)
- ``annotator_data_root``: Andy data root (optional, defaults to ``/var/andy/data``)
.. code-block:: yaml
:caption: example usage in config
dataset:
# ...
data_root: data
iterait:
task_ids: [1, 2, 3]
dataset_ids: [42]
hipipe_dirs: dataset
hipipe_build_type: Release
.. tip::
Start your work with ``emloop dataset init ...`` which will create the ``data_root``, download the most up to
date annotations, create symlinks to the data and build **hipipe** streams if necessary.
.. tip::
If you wrap a **hipipe** dataset, use :py:func:`blocks.utils.reflection.try_import` function.
Otherwise you would not be able to create the dataset until you build it manually.
"""
ITERAIT_SECTION_CFGNAME = 'iterait'
"""Name of the dataset iterait config section."""
TASK_IDS_CFGNAME = 'task_ids'
"""Name of the Annotator tasks IDs configuration."""
DATASET_IDS_CFGNAME = 'dataset_ids'
"""Name of the Annotator dataset IDs configuration."""
DATA_ROOT_CFGNAME = 'data_root'
"""Name of the data root configuration."""
HIPIPE_DIRS_CFGNAME = 'hipipe_dirs'
"""Name of the **hipipe** dirs configuration."""
HIPIPE_BUILD_TYPE_CFGNAME = 'hipipe_build_type'
"""Name of the **hipipe** build type configuration."""
ANNOTATOR_URL_CFGNAME = 'annotator_url'
"""Name of the Annotator URL configuration."""
ANNOTATOR_DATA_ROOT_CFGNAME = 'annotator_data_root'
"""Name of the Annotator data root configuration."""
DEFAULT_HIPIPE_BUILD_TYPE = 'Debug'
"""Default value for the **hipipe** build type configuration."""
DEFAULT_ANNOTATOR_URL = 'https://andy.iterait.com'
"""Default value for the Annotator URL configuration."""
DEFAULT_ANNOTATOR_DATA_ROOT = '/var/annotator/data'
"""Default value for the Annotator data root configuration."""
ANNOTATOR_USERNAME_ENV_VARIABLE = 'ANDY_USER'
"""Name of the Annotator username env. variable."""
ANNOTATOR_PASSWORD_ENV_VARIABLE = 'ANDY_PASS'
"""Name of the Annotator password env. variable."""
[docs] def __init__(self, config_str: str):
config = ruamel.yaml.load(config_str, ruamel.yaml.RoundTripLoader)
assert IteraitDataset.DATA_ROOT_CFGNAME in config, '`{}` must be specified in Iterait dataset config'\
.format(IteraitDataset.DATA_ROOT_CFGNAME)
self._data_root = config[IteraitDataset.DATA_ROOT_CFGNAME]
self._iterait_config = None
if IteraitDataset.ITERAIT_SECTION_CFGNAME in config:
self._iterait_config = config[IteraitDataset.ITERAIT_SECTION_CFGNAME]
else:
logging.warning('Missing `%s` section in Iterait dataset config', IteraitDataset.ITERAIT_SECTION_CFGNAME)
super().__init__(config_str)
def _maybe_create_data_root(self) -> None:
if not path.exists(self._data_root):
logging.info('Creating data root `%s`', self._data_root)
os.mkdir(self._data_root)
[docs] def init(self) -> None:
"""
Initialize the dataset, in particular:
- create ``data_root`` dir if necessary
- symlink all the data dirs if ``dataset_ids`` is specified
- symlink all the annotations if ``task_ids`` is specified
- build **hipipe** streams if ``hipipe_dirs`` is specified
"""
self._maybe_create_data_root()
if self._iterait_config is not None:
logging.info('Initializing the dataset')
if IteraitDataset.TASK_IDS_CFGNAME in self._iterait_config:
self.download_annotations()
if IteraitDataset.DATASET_IDS_CFGNAME in self._iterait_config:
self.symlink_data()
if IteraitDataset.HIPIPE_DIRS_CFGNAME in self._iterait_config:
self.build()
if hasattr(self, 'split'):
self.split()
if hasattr(self, 'check'):
self.check()
[docs] def download_annotations(self) -> None:
""""
Download the most up to date annotations for the configured task IDS.
"""
self._maybe_create_data_root()
if self._iterait_config is None or IteraitDataset.TASK_IDS_CFGNAME not in self._iterait_config:
raise ValueError('Cannot download annotations unless the list of annotation task IDs is specified in '
'the Iterait dataset config (dataset.{}.{})'
.format(IteraitDataset.ITERAIT_SECTION_CFGNAME, IteraitDataset.TASK_IDS_CFGNAME))
for var_name in [IteraitDataset.ANNOTATOR_USERNAME_ENV_VARIABLE, IteraitDataset.ANNOTATOR_PASSWORD_ENV_VARIABLE]:
if var_name not in os.environ:
raise ValueError('Cannot download annotations since env. variable `{}` is unset'.format(var_name))
annotator_user = os.environ[IteraitDataset.ANNOTATOR_USERNAME_ENV_VARIABLE]
annotator_pass = os.environ[IteraitDataset.ANNOTATOR_PASSWORD_ENV_VARIABLE]
annotator_url = self._iterait_config.get(IteraitDataset.ANNOTATOR_URL_CFGNAME, IteraitDataset.DEFAULT_ANNOTATOR_URL)
login_url = annotator_url+'/api/login'
annotation_url = annotator_url+'/api/tasks/{}/results.csv'
response = requests.post(login_url, json={'name': annotator_user, 'password': annotator_pass})
if response.status_code != 200:
raise ValueError('Annotator authentication failed with status code {} and message `{}`.'
.format(response.status_code, response.reason))
token = response.json()['token']
for annotation_task_id in self._iterait_config[IteraitDataset.TASK_IDS_CFGNAME]:
logging.info('Downloading annotions for task %s', annotation_task_id)
response = requests.get(annotation_url.format(annotation_task_id),
headers={'Authorization': 'Bearer {}'.format(token)})
if response.status_code != 200:
logging.error('\tfailed with status code %s and message `%s`.', response.status_code, response.reason)
continue
filename = re.findall("filename=(.+)", response.headers['content-disposition'])[0]
filepath = path.join(self._data_root, filename)
annotations = response.text
logging.info('\tdownloaded {} annotations'.format(len(annotations.split('\n'))))
logging.info('\twriting data to `{}`'.format(filepath))
with open(filepath, mode='w') as file:
file.write(annotations)
[docs] def symlink_data(self) -> None:
"""
Make symlinks to the data dirs for the configured dataset IDs.
"""
self._maybe_create_data_root()
if self._iterait_config is None or IteraitDataset.DATASET_IDS_CFGNAME not in self._iterait_config:
raise ValueError('Cannot symlink the data unless the list of dataset IDs is specified in '
'the Iterait dataset config (dataset.{}.{})'
.format(IteraitDataset.ITERAIT_SECTION_CFGNAME, IteraitDataset.DATASET_IDS_CFGNAME))
annotator_data_root = self._iterait_config.get(IteraitDataset.ANNOTATOR_DATA_ROOT_CFGNAME,
IteraitDataset.DEFAULT_ANNOTATOR_DATA_ROOT)
if not path.isdir(annotator_data_root):
raise ValueError('Specified Annotator data root `{}` is not a directory'.format(annotator_data_root))
for dataset_id in self._iterait_config[IteraitDataset.DATASET_IDS_CFGNAME]:
dataset_path = path.join(annotator_data_root, str(dataset_id))
if not path.isdir(dataset_path):
logging.error('Can not find data dir `%s` for dataset %s', dataset_path, dataset_id)
continue
logging.info('Creating symlinks for dataset %s', dataset_id)
for data_dir in next(os.walk(dataset_path))[1]:
symlink_path = path.join(self._data_root, data_dir)
symlink_source = path.join(dataset_path, data_dir)
if path.exists(symlink_path):
logging.info('\tdata dir `%s` is already symlinked', symlink_path)
else:
logging.info('\tcreating symlink `%s` -> `%s`', symlink_path, symlink_source)
os.symlink(symlink_source, symlink_path, target_is_directory=True)
[docs] def build(self) -> None:
"""
Build **hipipe** streams.
"""
if self._iterait_config is None or IteraitDataset.HIPIPE_DIRS_CFGNAME not in self._iterait_config:
raise ValueError('Cannot build the streams unless the list of hipipe stream dirs is specified in '
'the Iterait dataset config (dataset.{}.{})'
.format(IteraitDataset.ITERAIT_SECTION_CFGNAME, IteraitDataset.HIPIPE_DIRS_CFGNAME))
original_wd = os.getcwd()
build_type = self._iterait_config.get(IteraitDataset.HIPIPE_BUILD_TYPE_CFGNAME, IteraitDataset.DEFAULT_HIPIPE_BUILD_TYPE)
for hipipe_dir in self._iterait_config[IteraitDataset.HIPIPE_DIRS_CFGNAME]:
hipipe_dir_path = path.join(original_wd, hipipe_dir)
if not path.isdir(hipipe_dir_path):
raise ValueError('Specified hipipe dir `{}` is not a directory'.format(hipipe_dir_path))
logging.info('Build hipipe dir `%s` in `%s` mode', hipipe_dir, build_type)
build_dir = path.join(hipipe_dir_path, 'build')
os.makedirs(build_dir, exist_ok=True)
os.chdir(build_dir)
call(['cmake', '-DCMAKE_BUILD_TYPE={}'.format(build_type), '-DCMAKE_CXX_COMPILER=clang++', '..'])
call(['make', '-j2'])
call(['cp', '--remove-destination']+glob.glob('*.so*')+['..'])
os.chdir(original_wd)