Source code for blocks.datasets.rpn

import math
import yaml
import logging
from abc import abstractmethod
from collections import namedtuple
from typing import Tuple, Iterable, Sequence, Any

import emloop as el
import numpy as np

from .iterait_dataset import IteraitDataset

Rectangle = namedtuple('Rectangle', 'xmin ymin xmax ymax')
Anchor = Sequence[float]


[docs]class RPNDataset(IteraitDataset): """ Base RPN dataset wrapper for region classification and regression as the target. This dataset may be adjusted for any region type such as rectangles or ellipses. """
[docs] def __init__(self, config_str: str): """ Create new RPNDataset. :param config_str: yaml encoded configuration string """ self._config = yaml.load(config_str) self._anchor_per_example = self._config['anchor']['n_per_example'] self._anchor_min_positive_overlap = self._config['anchor'].get('min_positive_overlap', 0.7) self._anchor_max_negative_overlap = self._config['anchor'].get('max_negative_overlap', 0.3) # the following attributes will be set in the configure_shape method self._encoded_shape = None self._anchors = None self._pool_amount = None super().__init__(config_str)
[docs] def _overlap_to_label(self, overlap: float) -> int: """ Map overlap ratio to a -1/0/1 label. The semantics are the following: - -1: neutral not positive nor negative anchor - 0: negative anchor (no object is within it) - 1: positive anchor (object region has high overlap with the anchor) :param overlap: overlap ratio in 0-1 interval :return: anchor label """ if overlap > self._anchor_min_positive_overlap: return 1 elif overlap <= self._anchor_max_negative_overlap: return 0 return -1
[docs] @abstractmethod def _gen_anchors(self, pos=(0, 0)) -> Iterable[Anchor]: """ Generate anchors at the given position. :param pos: (x, y) anchor centers :return: anchors generator """
[docs] @abstractmethod def anchor_region_overlap(self, anchor: Anchor, region: Any) -> float: """ Calculate anchor-region overlap. :param anchor: anchor :param region: region :return: overlap ratio in 0-1 interval """
[docs] @abstractmethod def apply_diff(self, anchor: Anchor, diff: Anchor) -> Anchor: """ Apply predicted diff to the given anchor and return the result. :param anchor: anchor :param diff: predicted region difference with respect to the anchor :return: new anchor """
[docs] @abstractmethod def get_diff(self, anchor: Anchor, region: Any) -> Anchor: """ Get trainable difference of the given anchor to the given region. :param anchor: base anchor :param region: target region :return: anchor-region trainable difference """
[docs] @abstractmethod def diffs_dim(self) -> int: """Return trainable region-anchor diffs dimension. E.g. for rectangles this would be 4."""
[docs] @abstractmethod def get_regions(self, batch: el.Batch, index: int) -> Iterable[Any]: """Return an iteration of regions for the given batch and example id."""
[docs] def get_sensible_anchor_indices(self, region: Any) -> Iterable[Tuple[int, int]]: """ Return an iteration of anchor indices (x, y) to be considered for the given region. By default, all the anchor indices are returned which may be computationally expensive. You may want to limit the amount of returned indices, e.g.: return only anchors from certain radius around the region. """ for y in range(self._anchors.shape[0]): for x in range(self._anchors.shape[1]): yield (x, y)
@property def n_anchors_per_position(self): """The number of anchors for each position.""" return len(list(self._gen_anchors()))
[docs] def configure_shape(self, features_spatial_dim: Tuple[int, int], pool_amount: int) -> None: """ Configure the dataset with the features spatial dimension and amount of pooling the input images experience. .. note:: The dataset has to be configured at least once prior to utilizing the streams. :param features_spatial_dim: (height, width) feature spatial dimension :param pool_amount: amount of pooling of the input images (two maxpool-2 would yield 4) """ self._encoded_shape = features_spatial_dim self._pool_amount = pool_amount anchors_shape = self._encoded_shape + (self.n_anchors_per_position,) self._anchors = np.zeros(anchors_shape + (self.diffs_dim(),), dtype=np.int32) logging.info('Generating anchors') for y in range(self._anchors.shape[0]): for x in range(self._anchors.shape[1]): for i, anchor in enumerate(self._gen_anchors((x * self._pool_amount + self._pool_amount // 2, y * self._pool_amount + self._pool_amount // 2))): self._anchors[y, x, i] = anchor
[docs] def transform_batch(self, batch: el.Batch) -> el.Batch: """ Extend the given batch with anchors target. ..warning:: Works only for batches with batch size 1. :param batch: input batch of size 1 :return: batch extended with anchor targets """ if self._encoded_shape is None: raise ValueError('RPNDataset has to be configured at least once before utilizing the streams.') overall_anchor_labels = [] overall_anchor_masks = [] overall_diffs = [] overall_target_regions = [] for i in range(len(batch[next(iter(batch.keys()))])): anchors_shape = self._anchors.shape[:3] anchors_label = np.zeros(anchors_shape, dtype=np.int32) # -1/0/1 anchor labels anchors_mask = np.zeros(anchors_shape, dtype=np.uint8) # 0/1 anchor mask denoting anchors to be trained diffs = np.zeros(anchors_shape + (self.diffs_dim(),), dtype=np.float32) # anchor diffs to the target regions # map regions -> promising anchors -> targets target_regions = list(self.get_regions(batch, i)) for region in target_regions: best_match = None best_overlap = -1 matched = False for ix_x, ix_y in self.get_sensible_anchor_indices(region): for i, anchor in enumerate(self._anchors[ix_y, ix_x]): overlap = self.anchor_region_overlap(anchor, region) if overlap > best_overlap: best_overlap = overlap best_match = ix_y, ix_x, i label = self._overlap_to_label(overlap) if label != -1: anchors_label[ix_y, ix_x, i] = label if label == 1: matched = True diffs[ix_y, ix_x, i] = self.get_diff(anchor, region) if not matched: anchors_label[best_match] = 1 diffs[best_match] = self.get_diff(self._anchors[best_match], region) # select positive/negative anchors positives = np.argwhere(anchors_label == 1) negatives = np.argwhere(anchors_label == 0) # randomly select n=self._anchor_per_example anchors with the same amount of positive and negative anchors positives = positives[np.random.permutation(len(positives))[:self._anchor_per_example//2]] negatives = negatives[np.random.permutation(len(negatives))[:self._anchor_per_example//2]] # mark the selected anchors for training for anchors_indices in (positives, negatives): for anchor_indices in anchors_indices: anchors_mask[tuple(anchor_indices)] = 1 overall_anchor_labels.append(anchors_label) overall_anchor_masks.append(anchors_mask) overall_diffs.append(diffs) overall_target_regions.append(target_regions) batch.update({'anchors_label': overall_anchor_labels, # 4-dim anchor labels (batch x height x width x anchor) 'anchors_mask': overall_anchor_masks, # 4-dim anchor masks (batch x height x width x anchor) 'diffs': overall_diffs, # 4-dim anchor diffs to the target regions (b x h x w x anchor x ell)) 'target_regions': overall_target_regions}) return batch
def get_anchor(self, ix: Tuple[int, int, int]) -> Anchor: if self._anchors is None: raise ValueError('RPN dataset has to be configured at least once before anchors become available.') return self._anchors[tuple(ix)]
[docs]class RectangleRPNDataset(RPNDataset): """ :py:class:`RPNDataset` embodiment for rectangles. .. warning:: This dataset does not implement :py:meth:`self.get_regions` method and hence, cannot be used directly. """
[docs] def _gen_anchors(self, pos=(0, 0)) -> Iterable[Anchor]: """ Generate anchors at the given position. :param pos: (x, y) anchor centers :return: anchors generator """ for size in self._config['anchor']['sizes']: for ratio in self._config['anchor']['ratios']: yield pos+(size*ratio, size)
[docs] def anchor_region_overlap(self, anchor: Anchor, region: Any) -> float: """ Calculate rectangles overlap ratio. :param anchor: anchor :param region: region :return: overlap ratio in 0-1 interval """ # min-max (top-left -> bot-right) notion will be handy for the computations arect = Rectangle(anchor[0]-anchor[2]//2, anchor[1]-anchor[3]//2, anchor[0]-anchor[2]//2+anchor[2], anchor[1]-anchor[3]//2+anchor[3]) rrect = Rectangle(region[0]-region[2]//2, region[1]-region[3]//2, region[0]-region[2]//2+region[2], region[1]-region[3]//2+region[3]) arect_area = (arect.xmax-arect.xmin)*(arect.ymax-arect.ymin) rrect_area = (rrect.xmax-rrect.xmin)*(rrect.ymax-rrect.ymin) intersection = 0.0 dx = min(arect.xmax, rrect.xmax) - max(arect.xmin, rrect.xmin) dy = min(arect.ymax, rrect.ymax) - max(arect.ymin, rrect.ymin) if (dx >= 0) and (dy >= 0): intersection = dx*dy # intersection over union does not work well with rectangles of different sizes # union = arect_area + rrect_area - intersection # overlap = intersection/union return (intersection/arect_area + intersection/rrect_area)/2
[docs] def apply_diff(self, anchor: Anchor, diff: Anchor) -> Anchor: """ Apply predicted diff to the given anchor and return the result :param anchor: anchor :param diff: predicted region difference with respect to the anchor :return: new anchor """ return (anchor[0] + diff[0], anchor[1] + diff[1], anchor[2] * math.exp(diff[2]), anchor[3] * math.exp(diff[3]))
[docs] def get_diff(self, anchor: Anchor, target: Anchor) -> Anchor: """ Get trainable difference of the given anchor to the given region. :param anchor: base anchor :param target: target region :return: anchor-region trainable difference """ return (target[0] - anchor[0], target[1] - anchor[1], math.log(target[2] / anchor[2]), math.log(target[3] / anchor[3]))
[docs] def diffs_dim(self) -> int: """Return trainable region-anchor diffs dimension.""" return 4