Source code for blocks.datasets.iterait_dataset

import ruamel.yaml
import logging
import os
import os.path as path
import requests
import re
import glob
from subprocess import call

import emloop as el


[docs]class IteraitDataset(el.BaseDataset): """ WARNING: must be adapted to Andy. Base Iterait dataset providing methods for: - downloading annotations from Andy - creating symlinks to data @eric - building **hipipe** datasets All Iterait dataset configs must contain ``data_root``. Available options to be configured are: - ``task_ids``: a list of Andy task IDs to download the annotations for - ``dataset_ids``: a list of Andy dataset IDs to make symlinks to - ``hipipe_dirs``: a list of directories with hipipe CMake files to be build - ``hipipe_build_type``: **hipipe** build type (optional, defaults to ``Debug``) - ``annotator_url``: Andy API url (optional, defaults to ``https://andy.iterait.com``) - ``annotator_data_root``: Andy data root (optional, defaults to ``/var/andy/data``) .. code-block:: yaml :caption: example usage in config dataset: # ... data_root: data iterait: task_ids: [1, 2, 3] dataset_ids: [42] hipipe_dirs: dataset hipipe_build_type: Release .. tip:: Start your work with ``emloop dataset init ...`` which will create the ``data_root``, download the most up to date annotations, create symlinks to the data and build **hipipe** streams if necessary. .. tip:: If you wrap a **hipipe** dataset, use :py:func:`blocks.utils.reflection.try_import` function. Otherwise you would not be able to create the dataset until you build it manually. """ ITERAIT_SECTION_CFGNAME = 'iterait' """Name of the dataset iterait config section.""" TASK_IDS_CFGNAME = 'task_ids' """Name of the Annotator tasks IDs configuration.""" DATASET_IDS_CFGNAME = 'dataset_ids' """Name of the Annotator dataset IDs configuration.""" DATA_ROOT_CFGNAME = 'data_root' """Name of the data root configuration.""" HIPIPE_DIRS_CFGNAME = 'hipipe_dirs' """Name of the **hipipe** dirs configuration.""" HIPIPE_BUILD_TYPE_CFGNAME = 'hipipe_build_type' """Name of the **hipipe** build type configuration.""" ANNOTATOR_URL_CFGNAME = 'annotator_url' """Name of the Annotator URL configuration.""" ANNOTATOR_DATA_ROOT_CFGNAME = 'annotator_data_root' """Name of the Annotator data root configuration.""" DEFAULT_HIPIPE_BUILD_TYPE = 'Debug' """Default value for the **hipipe** build type configuration.""" DEFAULT_ANNOTATOR_URL = 'https://andy.iterait.com' """Default value for the Annotator URL configuration.""" DEFAULT_ANNOTATOR_DATA_ROOT = '/var/annotator/data' """Default value for the Annotator data root configuration.""" ANNOTATOR_USERNAME_ENV_VARIABLE = 'ANDY_USER' """Name of the Annotator username env. variable.""" ANNOTATOR_PASSWORD_ENV_VARIABLE = 'ANDY_PASS' """Name of the Annotator password env. variable."""
[docs] def __init__(self, config_str: str): config = ruamel.yaml.load(config_str, ruamel.yaml.RoundTripLoader) assert IteraitDataset.DATA_ROOT_CFGNAME in config, '`{}` must be specified in Iterait dataset config'\ .format(IteraitDataset.DATA_ROOT_CFGNAME) self._data_root = config[IteraitDataset.DATA_ROOT_CFGNAME] self._iterait_config = None if IteraitDataset.ITERAIT_SECTION_CFGNAME in config: self._iterait_config = config[IteraitDataset.ITERAIT_SECTION_CFGNAME] else: logging.warning('Missing `%s` section in Iterait dataset config', IteraitDataset.ITERAIT_SECTION_CFGNAME) super().__init__(config_str)
def _maybe_create_data_root(self) -> None: if not path.exists(self._data_root): logging.info('Creating data root `%s`', self._data_root) os.mkdir(self._data_root)
[docs] def init(self) -> None: """ Initialize the dataset, in particular: - create ``data_root`` dir if necessary - symlink all the data dirs if ``dataset_ids`` is specified - symlink all the annotations if ``task_ids`` is specified - build **hipipe** streams if ``hipipe_dirs`` is specified """ self._maybe_create_data_root() if self._iterait_config is not None: logging.info('Initializing the dataset') if IteraitDataset.TASK_IDS_CFGNAME in self._iterait_config: self.download_annotations() if IteraitDataset.DATASET_IDS_CFGNAME in self._iterait_config: self.symlink_data() if IteraitDataset.HIPIPE_DIRS_CFGNAME in self._iterait_config: self.build() if hasattr(self, 'split'): self.split() if hasattr(self, 'check'): self.check()
[docs] def download_annotations(self) -> None: """" Download the most up to date annotations for the configured task IDS. """ self._maybe_create_data_root() if self._iterait_config is None or IteraitDataset.TASK_IDS_CFGNAME not in self._iterait_config: raise ValueError('Cannot download annotations unless the list of annotation task IDs is specified in ' 'the Iterait dataset config (dataset.{}.{})' .format(IteraitDataset.ITERAIT_SECTION_CFGNAME, IteraitDataset.TASK_IDS_CFGNAME)) for var_name in [IteraitDataset.ANNOTATOR_USERNAME_ENV_VARIABLE, IteraitDataset.ANNOTATOR_PASSWORD_ENV_VARIABLE]: if var_name not in os.environ: raise ValueError('Cannot download annotations since env. variable `{}` is unset'.format(var_name)) annotator_user = os.environ[IteraitDataset.ANNOTATOR_USERNAME_ENV_VARIABLE] annotator_pass = os.environ[IteraitDataset.ANNOTATOR_PASSWORD_ENV_VARIABLE] annotator_url = self._iterait_config.get(IteraitDataset.ANNOTATOR_URL_CFGNAME, IteraitDataset.DEFAULT_ANNOTATOR_URL) login_url = annotator_url+'/api/login' annotation_url = annotator_url+'/api/tasks/{}/results.csv' response = requests.post(login_url, json={'name': annotator_user, 'password': annotator_pass}) if response.status_code != 200: raise ValueError('Annotator authentication failed with status code {} and message `{}`.' .format(response.status_code, response.reason)) token = response.json()['token'] for annotation_task_id in self._iterait_config[IteraitDataset.TASK_IDS_CFGNAME]: logging.info('Downloading annotions for task %s', annotation_task_id) response = requests.get(annotation_url.format(annotation_task_id), headers={'Authorization': 'Bearer {}'.format(token)}) if response.status_code != 200: logging.error('\tfailed with status code %s and message `%s`.', response.status_code, response.reason) continue filename = re.findall("filename=(.+)", response.headers['content-disposition'])[0] filepath = path.join(self._data_root, filename) annotations = response.text logging.info('\tdownloaded {} annotations'.format(len(annotations.split('\n')))) logging.info('\twriting data to `{}`'.format(filepath)) with open(filepath, mode='w') as file: file.write(annotations)
[docs] def build(self) -> None: """ Build **hipipe** streams. """ if self._iterait_config is None or IteraitDataset.HIPIPE_DIRS_CFGNAME not in self._iterait_config: raise ValueError('Cannot build the streams unless the list of hipipe stream dirs is specified in ' 'the Iterait dataset config (dataset.{}.{})' .format(IteraitDataset.ITERAIT_SECTION_CFGNAME, IteraitDataset.HIPIPE_DIRS_CFGNAME)) original_wd = os.getcwd() build_type = self._iterait_config.get(IteraitDataset.HIPIPE_BUILD_TYPE_CFGNAME, IteraitDataset.DEFAULT_HIPIPE_BUILD_TYPE) for hipipe_dir in self._iterait_config[IteraitDataset.HIPIPE_DIRS_CFGNAME]: hipipe_dir_path = path.join(original_wd, hipipe_dir) if not path.isdir(hipipe_dir_path): raise ValueError('Specified hipipe dir `{}` is not a directory'.format(hipipe_dir_path)) logging.info('Build hipipe dir `%s` in `%s` mode', hipipe_dir, build_type) build_dir = path.join(hipipe_dir_path, 'build') os.makedirs(build_dir, exist_ok=True) os.chdir(build_dir) call(['cmake', '-DCMAKE_BUILD_TYPE={}'.format(build_type), '-DCMAKE_CXX_COMPILER=clang++', '..']) call(['make', '-j2']) call(['cp', '--remove-destination']+glob.glob('*.so*')+['..']) os.chdir(original_wd)