ablator.main package#

Subpackages#

Submodules#

ablator.main.mp module#

class ablator.main.mp.ParallelTrainer(wrapper: ModelWrapper, run_config: ParallelConfig)[source]#

Bases: ProtoTrainer

A class for parallelizing multiple training processes of models of different configurations with ray.

Parameters:
wrapperModelWrapper

The model wrapper for the ParallelTrainer.

run_configParallelConfig

The runtime configuration for this trainer.

Examples

Below is a complete workflow on how to launch a parallel experiment with ParallelTrainer, from defining config, getting the model wrapper ready, to launching the experiment:

  • Define training config:

>>> my_optimizer_config = OptimizerConfig("sgd", {"lr": 0.5, "weight_decay": 0.5})
>>> my_scheduler_config = SchedulerConfig("step", arguments={"step_size": 1, "gamma": 0.99})
>>> train_config = TrainConfig(
...     dataset="[Dataset Name]",
...     batch_size=32,
...     epochs=10,
...     optimizer_config = my_optimizer_config,
...     scheduler_config = my_scheduler_config
... )
  • Define model config, we want to run HPO on activation functions and model hidden size:

>>> @configclass
>>> class CustomModelConfig(ModelConfig):
>>>     hidden_size: int
>>>     activation: str
>>> model_config = CustomModelConfig(hidden_size=100, activation="relu")
  • Define search space:

>>> search_space = {
...     "train_config.optimizer_config.arguments.lr": SearchSpace(
...         value_range = [0.001, 0.01],
...         value_type = 'float'
...         ),
...     "model_config.hidden_size": SearchSpace(value_range = [32, 64], value_type = 'int'),
...     "model_config.activation": SearchSpace(categorical_values = ["relu", "elu", "leakyRelu"]),
... }
  • Define run config (remember to redefine the parallel config to update the model config type to be CustomModelConfig):

>>> @configclass
>>> class CustomParallelConfig(ParallelConfig):
...    model_config: CustomModelConfig
>>>
>>> parallel_config = CustomParallelConfig(
...     train_config=train_config,
...     model_config=model_config,
...     metrics_n_batches = 800,
...     experiment_dir = "/tmp/experiments/",
...     device="cuda",
...     amp=True,
...     random_seed = 42,
...     total_trials = 20,
...     concurrent_trials = 3,
...     search_space = search_space,
...     optim_metrics = {"val_loss": "min"},
...     optim_metric_name = "val_loss",
...     gpu_mb_per_experiment = 1024
... )
  • Create model wrapper:

>>> class MyModelWrapper(ModelWrapper):
>>>     def __init__(self, *args, **kwargs):
>>>         super().__init__(*args, **kwargs)
>>>
>>>     def make_dataloader_train(self, run_config: CustomParallelConfig):
>>>         return torch.utils.data.DataLoader(<train_dataset>, batch_size=32, shuffle=True)
>>>
>>>     def make_dataloader_val(self, run_config: CustomParallelConfig):
>>>         return torch.utils.data.DataLoader(<val_dataset>, batch_size=32, shuffle=False)
  • After gathering all configurations and model wrapper, we can initialize and launch the parallel trainer:

>>> wrapper = MyModelWrapper(
...     model_class=<your_ModelModule_class>,
... )
>>> ablator = ParallelTrainer(
...     wrapper=wrapper,
...     run_config=parallel_config,
... )
>>> ablator.launch(working_directory = os.getcwd(), ray_head_address=None)
Attributes:
run_configParallelConfig

Running configuration for parallel training.

loggerRemoteFileLogger

A centralized logger that writes messages to a file and prints them to the console.

experiment_stateExperimentState

This attribute manages optuna trials.

gpu_managerty.Optional[GPUManager]

A GPU manager that manages GPU resources in the cluster.

available_resourcesdict[str, Resource]

A dictionary of available resources on each node.

node_managerNodeManager

A node manager that manages nodes and their resources.

ray_addressstr

The address of the ray cluster.

total_trialsint

Total number of trials to run.

launch(working_directory: str, auxilary_modules: list[module] | None = None, ray_head_address: str | None = None, resume: bool = False, excluding_files: list[str] | None = None)[source]#

Set up and launch the parallel ablation experiment. This sets up a ray cluster, and trials of different configuration initialized (or retrieved) will be pushed to the ray cluster to run in parallel.

Parameters:
working_directorystr

The working directory that stores codes and modules that will be used by ray.

auxilary_moduleslist[tys.ModuleType] | None

A list of modules to be used as ray clusters’ working environment.

ray_head_addressstr | None

Ray cluster address.

resumebool

Whether to resume training the model from existing checkpoints and existing experiment state, by default False.

excluding_fileslist[str] | None

A list of files in .gitignore format, that will be excluded from being uploaded to the ray cluster. If unspecified it ignores .git/** folder.

pre_train_setup()[source]#

Used to prepare resources to avoid stalling during training or when resources are shared between trainers.

property total_trials: int#

ablator.main.proto module#

class ablator.main.proto.ProtoTrainer(wrapper: ModelWrapper, run_config: RunConfig)[source]#

Bases: object

Manages resources for Prototyping. This trainer runs an experiment of a single prototype model (Therefore no ablation study nor HPO).

Parameters:
wrapperModelWrapper

The main model wrapper.

run_configRunConfig

Running configuration for the model.

Raises:
RuntimeError

If the experiment directory is not defined in the running configuration.

Examples

Below is a complete workflow on how to launch a prototype experiment with ProtoTrainer, from defining the config to launching the experiment:

  • Define training config:

>>> my_optimizer_config = OptimizerConfig("sgd", {"lr": 0.5, "weight_decay": 0.5})
>>> my_scheduler_config = SchedulerConfig("step", arguments={"step_size": 1, "gamma": 0.99})
>>> train_config = TrainConfig(
...     dataset="[Dataset Name]",
...     batch_size=32,
...     epochs=10,
...     optimizer_config = my_optimizer_config,
...     scheduler_config = my_scheduler_config
... )
  • Define model config: we use the default one with no custom hyperparameters (sometimes you would want to customize it to run ablation study/ HPO on the model’s hyperparameters in a parallel experiment, which needs ParallelTrainer and ParallelConfig instead of ProtoTrainer and RunConfig):

>>> model_config = ModelConfig()
  • Define run config:

>>> run_config = RunConfig(
...     train_config=train_config,
...     model_config=model_config,
...     metrics_n_batches = 800,
...     experiment_dir = "/tmp/experiments",
...     device="cpu",
...     amp=False,
...     random_seed = 42
... )
  • Create model wrapper:

>>> class MyModelWrapper(ModelWrapper):
>>>     def __init__(self, *args, **kwargs):
>>>         super().__init__(*args, **kwargs)
>>>
>>>     def make_dataloader_train(self, run_config: RunConfig):
>>>         return torch.utils.data.DataLoader(<train_dataset>, batch_size=32, shuffle=True)
>>>
>>>     def make_dataloader_val(self, run_config: RunConfig):
>>>         return torch.utils.data.DataLoader(<val_dataset>, batch_size=32, shuffle=False)
  • After gathering all configurations and model wrapper, it’s time we initialize and launch the prototype trainer. When launching the experiment, we must provide a working directory, which points to a git repository that is used for keeping track of the code differences:

>>> wrapper = MyModelWrapper(
...     model_class=<your_ModelModule_class>,
... )
>>> ablator = ProtoTrainer(
...     wrapper=wrapper,
...     run_config=run_config,
... )
>>> metrics = ablator.launch(working_directory=os.getcwd())  # suppose current directory is tracked by git
Attributes:
wrapperModelWrapper

The main model wrapper.

run_configRunConfig

Running configuration for the model.

experiment_dirPath

The path object to the experiment directory.

evaluate() dict[str, dict[str, Any]][source]#

Run model evaluation on the training results, sync evaluation results to external logging services (e.g Google cloud storage, other remote servers).

Returns:
metricsdict[str, dict[str, ty.Any]]

Metrics returned after evaluation.

launch(working_directory: str, debug: bool = False) dict[str, float][source]#

Launch the prototype experiment (train, evaluate the single prototype model) and return metrics.

Parameters:
working_directorystr

The working directory points to a git repository that is used for keeping track of the code differences.

debugbool, optional

Whether to train models in debug mode, by default False.

Returns:
metricsdict[str, float]

Metrics returned after training.

pre_train_setup()[source]#

Used to prepare resources to avoid stalling during training or when resources are shared between trainers.

smoke_test(config: RunConfig | None = None)[source]#

Run a smoke test training process on the model.

Parameters:
configRunConfig | None

Running configuration for the model.

Examples

try:

ablator.smoke_test(run_config)

except err:

raise err

Module contents#