Source code for ablator.modules.metrics.stores

import bisect
import inspect
import logging
import typing as ty
from collections.abc import Callable, Sequence

import numpy as np
import torch

import ablator.utils.base as butils


def _parse_moving_average_val(
    val: np.ndarray | torch.Tensor | int | float,
) -> int | float:
    if not isinstance(val, (np.ndarray, torch.Tensor, int, float)):
        raise ValueError(f"Invalid MovingAverage value type {type(val)}")
    if isinstance(val, (np.ndarray, torch.Tensor)):
        npval: np.ndarray = butils.iter_to_numpy(val)
        try:
            scalar = npval.item()
        except Exception as exc:
            raise ValueError(f"MovingAverage value must be scalar. Got {val}") from exc
    else:
        scalar = val
    return scalar


def _parse_array_store_val(
    val: np.ndarray | int | float,
    store_type: None | type = None,
    shape: None | tuple[int, ...] = None,
) -> np.ndarray:
    if not isinstance(val, (np.ndarray, int, float)):
        raise RuntimeError(f"Invalid ArrayStore value type {type(val)}")
    np_val: np.ndarray
    if isinstance(val, (int, float)) or len(val.shape) == 0:
        np_val = np.array([[val]])
    else:
        np_val = val
    if len(np_val.shape) < 2:
        raise ValueError(
            (
                "Missing batch dimension. If supplying a single value array, "
                "reshape to [B, 1] or if suppling a single a batch reshape to [1, N]."
            )
        )
    if store_type is not None and np_val.dtype != store_type:
        raise RuntimeError(
            f"Inhomogeneous types between stored values {store_type} and provided value {np_val.dtype}."
        )
    # skipping batch dim
    data_shape = np_val.shape[1:]
    if shape is not None and data_shape != shape:
        raise RuntimeError(
            f"Inhomogeneous shapes between stored values  {shape} and provided value {data_shape}"
        )
    return np_val


[docs]class ArrayStore(Sequence): """ Base class for manipulations (storing, getting, resetting) of batches of values. Parameters ---------- batch_limit : int | None The maximum number of batches of values to store for this single store. If set to None, unlimited number of batches will be stored. Default is None. memory_limit : int | None The maximum memory allowed for the prediction store. If set to None, unlimited number of batches will be stored. Default is None. Examples -------- >>> from ablator.modules.metrics.stores import ArrayStore >>> train_metrics = ArrayStore( ... batch_limit=50, ... memory_limit=1000 ... ) """ def __init__( self, batch_limit: int | None = None, # 100 MB memory limit memory_limit: int | None = None, ): # Initialize the storage based on the memory / batch_limits provided. super().__init__() self._arr: list[np.ndarray] = [] self.limit = int(batch_limit) if batch_limit is not None else None self._memory_limit = memory_limit self._arr_len: list[int] = [] self._len = 0 self._store_type: type | None = None self._shape: tuple[int, ...] | None = None self._item_size: int | None = None
[docs] def append(self, val: np.ndarray | float | int): """ Appends a batch of values, or a single value, constrained on the limits. If after appending a new batch, ``batch_limit`` is exceeded, only ``batch_limit`` number of latest batches is kept. If memory limit is exceeded, ``batch_limit`` will be reduced. Parameters ---------- val : np.ndarray | float | int The data, can be a batch of data, or a scalar. Examples -------- The following example shows a case where batch limit is exceeded (100 values/batches to be appended while only 10 is allowed) >>> from ablator.modules.metrics.stores import ArrayStore >>> array_store = ArrayStore( ... batch_limit=10, ... memory_limit=1000 ... ) >>> for i in range(100): >>> array_store.append(int(i)) >>> array_store.arr [90, 91, 92, 93, 94, 95, 96, 97, 98, 99] >>> array_store.limit 10 This example shows a case where memory limit is exceeded. As soon as the 5th value is appended, memory of the list is 104 > 100), so ``batch_limit`` is set to the length of the store so far (which is 5) reduced by 1, which equals to 4. Therefore, from then on, only 4 values/batches is allowed. >>> array_store = ArrayStore( ... batch_limit=10, ... memory_limit=100 ... ) >>> for i in range(100): >>> array_store.append(int(i)) >>> array_store.arr [96, 97, 98, 99] >>> array_store.limit 4 """ # Appending by batch is faster than converting numpy to list np_val = _parse_array_store_val( val, shape=self._shape, store_type=self._store_type ) self._arr.append(np_val) self._init_properties() np_val_len = len(np_val) self._arr_len.append(np_val_len) self._len += np_val_len if ( self._memory_limit is not None and self._item_size is not None and ( self.limit is None or self.limit > int(max(self._memory_limit // self._item_size - 1, 1)) ) and self._item_size * (self._len + 1) > self._memory_limit ): memory_limit = int(max(self._memory_limit // self._item_size - 1, 1)) limit: int = ( min(memory_limit, self.limit) if self.limit is not None else memory_limit ) logging.warning( "Memory limit %s reached for ArrayStore. Consider increasing `memory_limit`. " "Will prune to %s samples.", self._memory_limit, limit, ) self.limit = limit self._prune(limit) assert ( self._len == 1 or self._item_size * (self._len + 1) <= self._memory_limit ) elif self.limit is not None and self._len > 1 and self._len > self.limit: self._prune(self.limit)
def _init_properties(self): if len(self._arr) == 0: return if self._shape is None or self._item_size is None or self._store_type is None: last_item = self._arr[-1] self._store_type = last_item.dtype self._shape = last_item.shape[1:] self._item_size = last_item.size * last_item.itemsize @property def shape(self) -> tuple[int, ...] | None: return self._shape @property def item_size(self) -> int | None: return self._item_size @property def store_type(self) -> type | None: return self._store_type def _prune(self, limit: int): limit = int(limit) lens = np.cumsum(self._arr_len[::-1]) idx = np.argmax(lens > limit) underflow = lens[idx - 1] - limit if underflow > 0: # overflow! underflow = -limit if underflow < 0: idx += 1 self._arr[-idx] = self._arr[-idx][underflow:] self._arr_len[-idx] = -1 * underflow self._arr = self._arr[-idx:] self._arr_len = self._arr_len[-idx:] self._len = limit assert sum(self._arr_len) == limit
[docs] def get(self) -> np.ndarray: """ Returns the stored values as a numpy array. Returns ------- np.ndarray Returns the stored values. Examples -------- >>> from ablator.modules.metrics.stores import ArrayStore >>> array_store = ArrayStore( ... batch_limit=10, ... memory_limit=1000 ... ) >>> for i in range(100): >>> array_store.append(np.array([int(i)])) >>> array_store.get() array([[90], [91], [92], [93], [94], [95], [96], [97], [98], [99]]) """ if len(self._arr) == 0: return np.array([[]]) return np.concatenate(self._arr)
def __len__(self) -> int: return self._len def __getitem__(self, idx): idx = self._len + idx if idx < 0 else idx if idx < 0 or idx >= self._len: raise IndexError("list index out of range") cum_sum = np.cumsum(self._arr_len) if (arr_idx := bisect.bisect_right(cum_sum, idx)) > 0: idx -= cum_sum[arr_idx - 1] return self._arr[arr_idx][idx]
[docs] def reset(self): """ Reset list of values to empty. Examples -------- >>> from ablator.modules.metrics.stores import ArrayStore >>> array_store = ArrayStore( ... batch_limit=10, ... memory_limit=1000 ... ) >>> for i in range(100): >>> array_store.append(int(i)) >>> array_store.arr [90, 91, 92, 93, 94, 95, 96, 97, 98, 99] >>> array_store.reset() >>> array_store.arr [] """ self._arr = [] self._arr_len = [] self._len = 0
[docs]class PredictionStore: """ A class for storing prediction scores. This allows for evaluating prediction results using evaluation functions Parameters ---------- batch_limit : int | None Maximum number of batches to keep for each array store corresponding to each category of prediction outputs (e.g preds, labels), so only ``batch_limit`` number of latest batches is stored per set of array store. Default is 30. memory_limit : int | None Maximum memory (in bytes) of batches to keep for each array store corresponding to each category of prediction outputs (e.g preds, labels). Default is 1e8. moving_average_limit : int The maximum number of values allowed to store moving average metrics. Default is 3000. evaluation_functions : dict[str, Callable] | list[Callable] | None A dictionary of key-value pairs, keys are evaluation function names, values are callable evaluation functions, e.g mean, sum. Note that arguments to this Callable must match with names of prediction batches that the model returns. So if model prediction over a batch looks like this: ``{"preds": <batch of predictions>, "labels": <batch of predicted labels>}``, then callable's arguments should be ``preds`` and ``labels``, e.g ``evaluation_functions= {"mean": lambda preds, labels: np.mean(preads) + np.mean(labels)}``. Default is None. Raises ------ NotImplementedError If the evaluation function is unrecognized. Examples -------- >>> from ablator.modules.metrics.stores import PredictionStore >>> pred_store = PredictionStore( ... batch_limit=10, ... memory_limit=1000, ... moving_average_limit=1000, ... evaluation_functions={"mean": lambda x: np.mean(x)} ... ) """ def __init__( self, batch_limit: int | None = 30, # 100 MB memory limit memory_limit: int | None = int(1e8), moving_average_limit: int = 3000, evaluation_functions: dict[str, Callable] | list[Callable] | None = None, ): # Initialize the storage settings. super().__init__() self.limit = batch_limit self.memory_limit = memory_limit # not all arguments from evaluation functions are required. # we infer the overlapping arguments at run-time and store # as self._keys, a dictionary with keys the function names, # values is a list of infered arguments. self._fn_keys: dict[str, list[str]] | None = None self._batch_keys: set[str] | None = None self.metrics: dict[str, MovingAverage] = {} self.__evaluation_functions__ = None if isinstance(evaluation_functions, list): evaluation_functions_dict = {v.__name__: v for v in evaluation_functions} elif isinstance(evaluation_functions, dict): evaluation_functions_dict = evaluation_functions elif evaluation_functions is None: return else: raise NotImplementedError( f"Unrecognized evaluation functions {evaluation_functions}" ) self.metrics = { k: MovingAverage(moving_average_limit) for k in evaluation_functions_dict } self.__evaluation_functions__ = evaluation_functions_dict def _init_arr(self, tag: str) -> ArrayStore: attr_name = f"__{tag}_arr__" _arr = ArrayStore(batch_limit=self.limit, memory_limit=self.memory_limit) setattr(self, attr_name, _arr) return getattr(self, attr_name) def _get_arr(self, tag: str) -> ArrayStore: attr_name = f"__{tag}_arr__" arr = getattr(self, attr_name) return arr def _init_keys(self, batch_keys: set[str]): missing_keys: list[str] = [] self._batch_keys = batch_keys self._fn_keys = {} for k, v in self.evaluation_function_arguments.items(): if len(set(batch_keys).intersection(v)) == 0: missing_keys.append(f"{k}: {v}") else: self._fn_keys[k] = list(batch_keys.intersection(v)) if len(missing_keys) > 0: err_msg = "\n".join(missing_keys) raise ValueError( f"Batch keys do not match any function arguments: {err_msg}" ) for k in batch_keys: self._init_arr(k)
[docs] def append(self, **batches: dict[str, np.ndarray]): """ Appends batches of values, constrained on the limits in unison. Parameters ---------- **batches : dict[str, np.ndarray] A dictionary of key-value pairs, where key is type of prediction (e.g predictions, labels), and value is a batch of prediction values. Note that the passed keys in ``**batches`` must match arguments in evaluation functions arguments in the Callable in `evaluation_functions` when we initialize `PredictionStore` object. Raises ------ ValueError If passed keys do not match arguments in evaluation functions, or when batches among the keys are different in size. Examples -------- >>> from ablator.modules.metrics.stores import PredictionStore >>> pred_store = PredictionStore( ... batch_limit=10, ... memory_limit=1000, ... moving_average_limit=1000, ... evaluation_functions={"mean": lambda preds, labels: np.mean(preds) + np.mean(labels)} ... ) >>> pred_store.append(preds=np.array([4,3,0]), labels=np.array([5,1,1])) """ batch_keys = set(batches.keys()) if len(batch_keys) == 0: raise ValueError("Must provide keyed batch arguments.") if self._batch_keys is None: self._init_keys(batch_keys=batch_keys) assert self._batch_keys is not None sizes = {} assert self._batch_keys == batch_keys, ( f"Inhomogeneous keys from the prediction store update. " f"Expected: {sorted(self._batch_keys)}, received {sorted(batch_keys)}" ) for k, v in batches.items(): np_arr = butils.iter_to_numpy(v) sizes[k] = len(np_arr) self._get_arr(k).append(np_arr) assert ( len(set(sizes.values())) == 1 ), f"Inhomegenous batches between inputs. Sizes: {sizes}" limits = [] for k in self._batch_keys: if (_limit := self._get_arr(k).limit) is not None: limits.append(_limit) if self.limit is not None or len(limits) > 0: self.limit = min(limits) for k in self._batch_keys: self._get_arr(k).limit = self.limit
[docs] def evaluate(self) -> dict[str, float]: """ Apply evaluation_functions to predictions sets, e.g preds, labels. Returns ------- dict[str, float] A dictionary of metric values calculated from different sets of predictions. Raises ------ AssertionError If passed keys do not match arguments in evaluation functions. ValueError If evaluation result is not a numeric scalar. Examples -------- >>> from ablator.modules.metrics.main import PredictionStore >>> pred_store = PredictionStore( ... batch_limit=30, ... evaluation_functions={"mean": lambda preds, labels: np.mean(preds) + np.mean(labels) ... moving_average_limit=100 ... ) >>> pred_store.append(preds=np.array([4,3,0]), labels=np.array([5,1,3])) >>> pred_store.evaluate() {'mean': 5.333333333333334} """ metrics: dict[str, float] = {} if ( self._batch_keys is None or self.__evaluation_functions__ is None or len(self._batch_keys) == 0 ): return metrics batches = {k: self._get_arr(k).get() for k in self._batch_keys} for k, v in self.__evaluation_functions__.items(): fn_args = set(inspect.signature(v).parameters.keys()) intersecting_args = set(self._batch_keys).intersection(fn_args) assert ( len(intersecting_args) > 0 ), f"Evaluation function arguments {fn_args} different than stored predictions: {self._batch_keys}" metric = v(**{k: v for k, v in batches.items() if k in intersecting_args}) metric = _parse_moving_average_val(metric) metrics[k] = metric try: self.metrics[k].append(metric) except Exception as exc: raise ValueError( f"Invalid value {metric} returned by evaluation function {v.__name__}. Must be numeric scalar." ) from exc return metrics
@property def evaluation_function_arguments(self) -> dict[str, list[str]]: if self.__evaluation_functions__ is None: return {} return { k: list(inspect.signature(v).parameters.keys()) for k, v in self.__evaluation_functions__.items() }
[docs] def get(self) -> dict[str, np.ndarray] | None: if self._batch_keys is None: return None return {k: self._get_arr(k).get() for k in self._batch_keys}
[docs] def reset(self): """ Reset to empty all prediction sequences (e.g predictions, labels). Examples -------- >>> from ablator.modules.metrics.main import PredictionStore >>> pred_store = PredictionStore( ... batch_limit=30, ... memory_limit=None, ... evaluation_functions={"sum": lambda pred: np.mean(pred)}, ... moving_average_limit=100 ... ) >>> pred_store.append(preds=np.array([4,3,0]), labels=np.array([5,1,3])) >>> pred_store.reset() """ if self._batch_keys is None: return for k in self._batch_keys: self._get_arr(k).reset()
[docs]class MovingAverage(ArrayStore): """ This class is used to store moving average metrics """ @property def __mean__(self): return np.mean(self._arr) @property def last(self): if len(self._arr) > 0: return self._arr[-1].item() return np.nan @property def value(self): if len(self._arr) > 0: return self.__mean__ return np.nan def __lt__(self, __o: float) -> bool: return float(self.value).__lt__(__o) def __gt__(self, __o: float) -> bool: return float(self.value).__gt__(__o) def __eq__(self, __o: object) -> bool: return float(self.value).__eq__(__o) def __float__(self) -> float: return float(self.value) def __format__(self, format_spec=".2e") -> str: return format(self.value, format_spec) def __repr__(self) -> str: return f"{self.value:.2e}" # flake8: noqa: DOC502
[docs] def append(self, val: ty.Union[np.ndarray, torch.Tensor, float, int]): """ Appends a batch of values, or a single value, constrained on the limits. Parameters ---------- val : ty.Union[np.ndarray, torch.Tensor, float, int] The data to be appended Raises ------ ValueError: If appended value is of required type, or if val is not a scalar. Examples -------- >>> from ablator.modules.metrics.stores import MovingAverage >>> ma_store = MovingAverage() >>> for i in range(100): >>> ma_store.append(np.array([int(i)])) >>> ma_store.arr [70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99] """ scalar = _parse_moving_average_val(val) super().append(scalar)