import multiprocessing as mp
import traceback
from logging import warning
from pathlib import Path
import builtins
import numpy as np
import pandas as pd
import ray
from joblib import Memory
from ablator.config.main import ConfigBase
from ablator.config.mp import ParallelConfig, SearchSpace
from ablator.config.proto import Optim, RunConfig
[docs]def read_result(config_type: type[ConfigBase], json_path: Path) -> pd.DataFrame | None:
"""
Read the results of an experiment and return them as a pandas ``DataFrame``.
The function reads the data from a JSON file, processes each row, and appends
experiment attributes from a YAML configuration file. The resulting ``DataFrame``
is indexed and returned.
Parameters
----------
config_type : type[ConfigBase]
The type of configuration class that is used to load the experiment
configuration from a YAML file.
json_path : Path
The path to the JSON file containing the results of the experiment.
Returns
-------
pd.DataFrame | None
A pandas ``DataFrame`` containing the processed experiment results.
Returns ``None`` if there was an error in reading the ``json_path`` results.
Examples
--------
Suppose result json file ``/tmp/myexperiment/results.json`` contains:
>>> json.load("results.json")
[{
"train_loss": 10.35,
"val_loss": NaN,
"current_epoch": 1,
},
{
"train_loss": 3.89,
"val_loss": 7.04,
"current_epoch": 2,
}]
And the corresponding configuration object ``run_config`` is created as:
>>> config = {
... "model_config": {},
... "train_config": {
... 'dataset': 'Fashion-mnist',
... 'batch_size': 32,
... 'epochs': 20,
... 'optimizer_config': {
... 'name': 'adam',
... 'arguments': {
... 'betas': (0.9, 0.999), 'weight_decay': 0.0, 'lr': 0.001
... }
... },
... 'scheduler_config': None
... },
... "experiment_dir": '/tmp/experiments',
... "random_seed": 42,
... # ... other configs
... "optim_metrics": None,
... "optim_metric_name": None
... }
>>> run_config = RunConfig(**config)
The function ``read_result`` will return a pandas data frame like below:
>>> read_result(run_config, Path("/tmp/myexperiment/results.json"))
experiment_dir keep_n_checkpoints ... train_loss val_loss current_epoch
trial_uid step
experiments_ 0 C:/tmp/experiments 3 ... 10.35 NaN 1
1 C:/tmp/experiments 3 ... 3.89 7.04 2
"""
try:
experiment_config = config_type.load(
json_path.parent.joinpath("config.yaml"), debug=True
)
experiment_attributes = experiment_config.make_dict(
experiment_config.annotations, flatten=True
)
df = pd.read_json(json_path)
df = pd.concat([pd.DataFrame([experiment_attributes] * len(df)), df], axis=1)
df.index.name = "step"
df.reset_index(inplace=True)
df["trial_uid"] = json_path.parent.name
return df.set_index(["trial_uid", "step"])
# pylint: disable=broad-exception-caught
except builtins.Exception:
traceback.print_exc()
return None
[docs]class Results:
"""
Class for processing experiment results. You can use this class to read the results in an
experiment output directory. This can be used in combination with ``PlotAnalysis`` to show the
correlation between hyperparameters and metrics. Refer to
`Interpreting Results <./notebooks/Interpreting-results.ipynb>`_ tutorial for more details on plotting
and interpreting experiment results.
Parameters
----------
config : type[ParallelConfig] | ParallelConfig
The configuration class used
experiment_dir : str | Path
The path to the experiment directory.
cache : bool
Whether to cache the results, by default ``False``.
use_ray : bool
Whether to use ray for parallel processing, by default ``False``.
Attributes
----------
experiment_dir : Path
The path to the experiment directory.
config : type[ParallelConfig]
The configuration class used.
metric_map : dict[str, Optim]
A dictionary mapping metric names to their optimization direction.
data: pd.DataFrame
The processed results of the experiment. Refer to ``read_results`` for more details.
config_attrs: list[str]
The list of all the optimizable hyperparameter names
search_space: dict[str, ty.Any]
All the search space of the experiment.
numerical_attributes: list[str]
The list of all the numerical hyperparameter names
categorical_attributes: list[str]
The list of all the categorical hyperparameter names.
Raises
------
FileNotFoundError
If the experiment directory doesn't exists.
ValueError
If ``RunConfig`` is provided instead of ``ParallelConfig``.
Examples
--------
Suppose you have an experiment output directory stored at ``<path to experiment output defined in
config experiment_dir>``. You can read the results from the directory as follows:
>>> directory_path = Path('<path to experiment output defined in config experiment_dir>')
>>> results = Results(config=ParallelConfig, experiment_dir=directory_path, use_ray=True)
>>> df = results.read_results(config_type=ParallelConfig, experiment_dir=directory_path)
Pass ``df`` to ``PlotAnalysis`` to create an analysis object for plotting the correlation between
the hyperparameters and the metrics and save the plots to an output directory. For example, the following
template generates plots for each of the numerical and categorical hyperparameters and saves them to
``./plots`` directory. Here "Validation Accuracy" is the name of the main metric.
>>> analysis = PlotAnalysis(
... df,
... save_dir="./plots",
... cache=True,
... optim_metrics={"val_accuracy": Optim.max},
... numerical_attributes=<numerical name remap keys names>,
... categorical_attributes=<categorical name remap keys names>,
... )
>>> analysis.make_figures(
... metric_name_remap={
... "val_accuracy": "Validation Accuracy",
... },
... attribute_name_remap= attribute_name_remap
... )
"""
def __init__(
self,
config: type[ParallelConfig] | ParallelConfig,
experiment_dir: str | Path,
cache: bool = False,
use_ray: bool = False,
) -> None:
if not isinstance(config, type):
config_type = type(config)
else:
config_type = config
if issubclass(config_type, RunConfig) and not issubclass(
config_type, ParallelConfig
):
raise ValueError(
"Provided a ``RunConfig`` used for a single-trial. Analysis "
"is not meaningful for a single trial. Please provide a ``ParallelConfig``."
)
if not issubclass(config_type, ParallelConfig):
raise ValueError(
"Configuration must be subclassed from ``ParallelConfig``. "
)
# TODO parse results from experiment directory as opposed to configuration.
# Need a way to derived MPConfig implementation from a pickled file.
# We need the types of the configuration, metric map.
self.experiment_dir = Path(experiment_dir)
run_config_path = self.experiment_dir.joinpath("default_config.yaml")
if not run_config_path.exists():
raise FileNotFoundError(f"{run_config_path}")
self.config = config_type.load(run_config_path)
# TODO we need to have mypy plugin to interpret Dict[Optim] as dict[str, Optim]
self.metric_map: dict[str, Optim] = self.config.optim_metrics # type: ignore[assignment]
self.data: pd.DataFrame = self._make_data(use_ray=use_ray, clean=not cache)
self.config_attrs: list[str] = list(self.config.search_space.keys())
self.search_space: dict[str, SearchSpace] = self.config.search_space
# NOTE possibly a good idea to set small range integer attributes to categorical
self.numerical_attributes = [
k for k, v in self.search_space.items() if v.value_range is not None
]
self.categorical_attributes = [
k for k, v in self.search_space.items() if v.categorical_values is not None
]
self._assert_cat_attributes(self.categorical_attributes)
def _make_data(self, use_ray: bool = False, clean: bool = False):
memory = Memory(self.experiment_dir.joinpath(".cache"), verbose=0)
_parse_results = memory.cache(self._parse_results, ignore=["self", "init_ray"])
if clean:
_parse_results.clear()
return _parse_results(self.experiment_dir, init_ray=use_ray)
def _assert_cat_attributes(self, categorical_attributes: list[str]):
"""
Check if the categorical attributes are imbalanced
default ratio is 0.8, which means if the most frequent value
is more than 80% every other kind of values, then it is imbalanced
Parameters
----------
categorical_attributes : list[str]
list of categorical attributes
Examples
--------
>>> [X,X,Y,Z]imbalanced
>>> [X,Y,Z]balanced
>>> [X,X Y,Y,Z]balanced
"""
for attr in categorical_attributes:
value_counts = self.data[attr].value_counts()
unique_values, counts = np.array(value_counts.index), value_counts.values
imbalance_ratio_cut_off = 0.8
imbalanced_values = unique_values[
counts / counts.max() > (1 - imbalance_ratio_cut_off)
]
if len(imbalanced_values) == 1:
warning(
f"Imbalanced trials for attr {attr} and values: {unique_values} with counts {counts}."
)
@property
def metric_names(self) -> list[str]:
"""
Get the list of all optimize directions
Returns
-------
list[str]
list of optimize metric names
Examples
--------
>>> results.metric_names
["val_loss", "train_loss", "val_acc", "train_acc"]
"""
return [str(v) for v in self.metric_map.values()]
def _parse_results(
self,
experiment_dir: Path,
init_ray: bool,
) -> pd.DataFrame:
"""
Read multiple results from experiment directory with ray to enable parallel processing.
Parameters
----------
experiment_dir : Path
The experiment directory
init_ray : bool
Whether to use ray for parallel processing
Returns
-------
pd.DataFrame
Pandas Dataframe from read_results.
"""
assert (
experiment_dir.exists()
), f"Experiment directory {experiment_dir} does not exist. You can provide one as an argument `experiment_dir`"
if init_ray and not ray.is_initialized():
ray.init(address="local")
return self.read_results(type(self.config), experiment_dir)
[docs] @classmethod
def read_results(
cls,
config_type: type[ConfigBase],
experiment_dir: Path | str,
num_cpus: float | None = None,
) -> pd.DataFrame:
"""
Read all experiment results from the experiment directory (with ray if specified when initializing ``Result``).
Parameters
----------
config_type : type[ConfigBase]
The configuration class.
experiment_dir : Path | str
The experiment directory.
num_cpus : float | None
Number of CPUs to use for ray processing, by default ``None``.
Returns
-------
pd.DataFrame
A data frame of all the results from all experiments in ``experiment_dir``.
Raises
------
RuntimeError
If no results are present in the experiment directory.
Examples
--------
>>> results.read_results(config_type = ParallelConfig, experiment_dir = "/tmp/results/experiment_8925_9991/")
train_loss val_loss best_iteration best_loss current_epoch current_iteration epochs
13.3658738 0 inf 1 100 5
2.277102967 0.277085876 100 0.277085876 2 200 5
2.277154112 0.27619998 200 0.27619998 3 300 5
2.276529543 0.286987235 200 0.27619998 4 400 5
2.279828385 0.274052692 400 0.274052692 5 500 5
11.91869608 0 inf 1 100 5
"""
results: list[pd.DataFrame] = []
futures: list[ray.ObjectRef] = []
json_paths = list(Path(experiment_dir).rglob("results.json"))
if len(json_paths) == 0:
raise RuntimeError(f"No results found in {experiment_dir}")
cpu_count = mp.cpu_count()
if num_cpus is None:
num_cpus = len(json_paths) / (cpu_count * 4)
json_path = None
for json_path in json_paths:
if ray.is_initialized():
futures.append(
ray.remote(num_cpus=num_cpus)(read_result).remote(
config_type, json_path
)
)
else:
results.append(read_result(config_type, json_path))
if ray.is_initialized() and len(json_paths) > 0:
# smoke test
read_result(config_type, json_path)
results = ray.get(futures)
results = list(filter(lambda x: x is not None, results))
return pd.concat(results)