Source code for ablator.analysis.results

import multiprocessing as mp
import traceback
from logging import warning
from pathlib import Path
import builtins

import numpy as np
import pandas as pd
import ray
from joblib import Memory

from ablator.config.main import ConfigBase
from ablator.config.mp import Optim, ParallelConfig, SearchSpace
from ablator.config.proto import RunConfig


[docs]def read_result(config_type: type[ConfigBase], json_path: Path) -> pd.DataFrame | None: """ Read the results of an experiment and return them as a pandas DataFrame. The function reads the data from a JSON file, processes each row, and appends experiment attributes from a YAML configuration file. The resulting DataFrame is indexed and returned. Parameters ---------- config_type : type[ConfigBase] The type of the configuration class that is used to load the experiment configuration from a YAML file. json_path : Path The path to the JSON file containing the results of the experiment. Returns ------- pd.DataFrame | None A pandas DataFrame containing the processed experiment results. Returns None if there was an error in reading the json_path results. Examples -------- >>> result json file: { "run_id": "run_1", "accuracy": 0.85, "loss": 0.35 } { "run_id": "run_2", "accuracy": 0.87, "loss": 0.32 } >>> config file experiment_name: "My Experiment" batch_size: 64 >>> return value run_id accuracy loss experiment_name batch_size path 0 run_1 0.85 0.35 My Experiment 64 path/to/experiment 1 run_2 0.87 0.32 My Experiment 64 path/to/experiment """ try: experiment_config = config_type.load(json_path.parent.joinpath("config.yaml")) experiment_attributes = experiment_config.make_dict( experiment_config.annotations, flatten=True ) df = pd.read_json(json_path) df = pd.concat([pd.DataFrame([experiment_attributes] * len(df)), df], axis=1) df.index.name = "step" df.reset_index(inplace=True) df["trial_uid"] = json_path.parent.name return df.set_index(["trial_uid", "step"]) # pylint: disable=broad-exception-caught except builtins.Exception: traceback.print_exc() return None
[docs]class Results: """ Class for processing experiment results. You can use this class to read the results in an experiment output directory. This can be used in combination with ``PlotAnalysis`` to show the correlation between hyperparameters and metrics. Refer to :ref:`Interpreting Results <interpret_results>` tutorial for more details on plotting and interpreting experiment results. Examples -------- >>> directory_path = Path('<path to experiment output defined in experiment_dir>') >>> results = Results(config = ParallelConfig, experiment_dir=directory_path, use_ray=True) >>> df = results.read_results(config_type=ParallelConfig, experiment_dir=directory_path) Pass ``df`` to ``PlotAnalysis`` to create an analysis object that's able to plot the correlation between the hyperparameters and metrics and save the plots to an output directory. For example, the following code snippet generates plots for each of the numerical and categorical hyperparameters and saves them to ``./plots`` directory. Here "Validation Accuracy" is the name of the main metric. >>> analysis = PlotAnalysis( ... df, ... save_dir="./plots", ... cache=True, ... optim_metrics={"val_accuracy": Optim.max}, ... numerical_attributes=<numerical name remap keys names>, ... categorical_attributes=<categorical name remap keys names>, ... ) >>> analysis.make_figures( ... metric_name_remap={ ... "val_accuracy": "Validation Accuracy", ... }, ... attribute_name_remap= attribute_name_remap ... ) Parameters ---------- config : type[ParallelConfig] The configuration class used experiment_dir : str | Path The path to the experiment directory. cache : bool, optional Whether to cache the results, by default ``False`` use_ray : bool, optional Whether to use ray for parallel processing, by default ``False`` Attributes ---------- experiment_dir : Path The path to the experiment directory. config : type[ParallelConfig] The configuration class used metric_map : dict[str, Optim] A dictionary mapping optimize metric names to their optimization direction. data: pd.DataFrame The processed results of the experiment. Refer ``read_results`` for more details. config_attrs: list[str] The list of all the optimizable hyperparameter names search_space: dict[str, ty.Any] All the search space of the experiment. numerical_attributes: list[str] The list of all the numerical hyperparameter names categorical_attributes: list[str] The list of all the categorical hyperparameter names """ def __init__( self, config: type[ParallelConfig] | ParallelConfig, experiment_dir: str | Path, cache: bool = False, use_ray: bool = False, ) -> None: if not isinstance(config, type): config_type = type(config) else: config_type = config if issubclass(config_type, RunConfig) and not issubclass( config_type, ParallelConfig ): raise ValueError( "Provided a ``RunConfig`` used for a single-trial. Analysis " "is not meaningful for a single trial. Please provide a ``ParallelConfig``." ) if not issubclass(config_type, ParallelConfig): raise ValueError( "Configuration must be subclassed from ``ParallelConfig``. " ) # TODO parse results from experiment directory as opposed to configuration. # Need a way to derived MPConfig implementation from a pickled file. # We need the types of the configuration, metric map. self.experiment_dir = Path(experiment_dir) run_config_path = self.experiment_dir.joinpath("default_config.yaml") if not run_config_path.exists(): raise FileNotFoundError(f"{run_config_path}") self.config = config_type.load(run_config_path) self.metric_map: dict[str, Optim] = self.config.optim_metrics self._make_cache(clean=not cache) self.data: pd.DataFrame = self._parse_results( self.experiment_dir, init_ray=use_ray ) self.config_attrs: list[str] = list(self.config.search_space.keys()) self.search_space: dict[str, SearchSpace] = self.config.search_space # NOTE possibly a good idea to set small range integer attributes to categorical self.numerical_attributes = [ k for k, v in self.search_space.items() if v.value_range is not None ] self.categorical_attributes = [ k for k, v in self.search_space.items() if v.categorical_values is not None ] self._assert_cat_attributes(self.categorical_attributes) def _make_cache(self, clean=False): memory = Memory(self.experiment_dir.joinpath(".cache"), verbose=0) self._parse_results = memory.cache( self._parse_results, ignore=["self", "init_ray"] ) if clean: self._parse_results.clear() def _assert_cat_attributes(self, categorical_attributes: list[str]): """ Check if the categorical attributes are imbalanced default ratio is 0.8, which means if the most frequent value is more than 80% every other kind of values, then it is imbalanced Parameters ---------- categorical_attributes : list[str] list of categorical attributes Raises ------ AssertionError if the categorical attributes are imbalanced Examples -------- >>> [X,X,Y,Z]imbalanced >>> [X,Y,Z]balanced >>> [X,X Y,Y,Z]balanced """ for attr in categorical_attributes: value_counts = self.data[attr].value_counts() unique_values, counts = np.array(value_counts.index), value_counts.values imbalance_ratio_cut_off = 0.8 imbalanced_values = unique_values[ counts / counts.max() > (1 - imbalance_ratio_cut_off) ] if len(imbalanced_values) == 1: warning( f"Imbalanced trials for attr {attr} and values: {unique_values} with counts {counts}." ) @property def metric_names(self) -> list[str]: """ Get the list of all optimize directions Returns ------- list[str] list of optimize metric names """ return [str(v) for v in self.metric_map.values()] # method-hidden because we over-write it with the cached version. # pylint: disable=method-hidden def _parse_results( self, experiment_dir: Path, init_ray: bool, ): """ Read multiple results from experiment directory with ray to enable parallel processing. Parameters ---------- experiment_dir : Path The experiment directory init_ray : bool Whether to use ray for parallel processing """ assert ( experiment_dir.exists() ), f"Experiment directory {experiment_dir} does not exist. You can provide one as an argument `experiment_dir`" if init_ray and not ray.is_initialized(): ray.init(address="local") return self.read_results(type(self.config), experiment_dir)
[docs] @classmethod def read_results( cls, config_type: type[ConfigBase], experiment_dir: Path | str, num_cpus=None, ) -> pd.DataFrame: """ Read multiple results from experiment directory with ray to enable parallel processing. This function calls ``read_result`` many times, refer to ``read_result`` for more details. Parameters ---------- config_type : type[ConfigBase] The configuration class experiment_dir : Path | str The experiment directory num_cpus : int, optional Number of CPUs to use for ray processing, by default ``None`` Returns ------- pd.DataFrame A dataframe of all the results """ results: list[pd.DataFrame] = [] futures: list[ray.ObjectRef] = [] json_paths = list(Path(experiment_dir).rglob("results.json")) if len(json_paths) == 0: raise RuntimeError(f"No results found in {experiment_dir}") cpu_count = mp.cpu_count() if num_cpus is None: num_cpus = len(json_paths) / (cpu_count * 4) json_path = None for json_path in json_paths: if ray.is_initialized(): futures.append( ray.remote(num_cpus=num_cpus)(read_result).remote( config_type, json_path ) ) else: results.append(read_result(config_type, json_path)) if ray.is_initialized() and len(json_paths) > 0: # smoke test read_result(config_type, json_path) results = ray.get(futures) results = list(filter(lambda x: x is not None, results)) return pd.concat(results)