ablator.main package#

Subpackages#

Submodules#

ablator.main.mp module#

class ablator.main.mp.ParallelTrainer(wrapper: ModelWrapper, run_config: ParallelConfig)[source]#

Bases: ProtoTrainer

A class for parallelizing training and hyperparameter optimization of models of different configurations with ray.

Examples

Below is a complete workflow on how to launch a parallel experiment with ParallelTrainer, from defining config, getting the model wrapper ready, to launching the experiment:

  • Define training config:

>>> my_optim_config = OptimizerConfig("sgd", {"lr": 0.5, "weight_decay": 0.5})
>>> my_scheduler_config = SchedulerConfig("step", arguments={"step_size": 1, "gamma": 0.99})
>>> train_config = TrainConfig(
...     dataset="[Dataset Name]",
...     batch_size=32,
...     epochs=10,
...     optimizer_config = my_optimizer_config,
...     scheduler_config = my_scheduler_config,
...     rand_weights_init = True
... )
  • Define model config, we want to run HPO on activation functions and model hidden size:

>>> @configclass
>>> class CustomModelConfig(ModelConfig):
>>>     hidden_size: int
>>>     activation: str
>>> model_config = CustomModelConfig(num_filter1 =32, num_filter2 = 64, activation = "relu")
  • Define search space:

>>> search_space = {
...     "train_config.optimizer_config.arguments.lr": SearchSpace(
...         value_range = [0.001, 0.01],
...         value_type = 'float'
...         ),
...     "model_config.hidden_size": SearchSpace(value_range = [32, 64], value_type = 'int'),
...     "model_config.activation": SearchSpace(categorical_values = ["relu", "elu", "leakyRelu"]),
... }
  • Define run config (remember to redefine the parallel config to update the model config type to be CustomModelConfig):

>>> @configclass
>>> class CustomParallelConfig(ParallelConfig):
...    model_config: CustomModelConfig
>>>
>>> parallel_config = CustomParallelConfig(
...     train_config=train_config,
...     model_config=model_config,
...     metrics_n_batches = 800,
...     experiment_dir = "/tmp/experiments/",
...     device="cuda",
...     amp=True,
...     random_seed = 42,
...     total_trials = 20,
...     concurrent_trials = 20,
...     search_space = search_space,
...     optim_metrics = {"val_loss": "min"},
...     gpu_mb_per_experiment = 1024,
...     cpus_per_experiment = 1,
... )
  • Create model wrapper:

>>> class MyModelWrapper(ModelWrapper):
>>>     def __init__(self, *args, **kwargs):
>>>         super().__init__(*args, **kwargs)
>>>
>>>     def make_dataloader_train(self, run_config: CustomRunConfig):
>>>         return torch.utils.data.DataLoader(<train_dataset>, batch_size=32, shuffle=True)
>>>
>>>     def make_dataloader_val(self, run_config: CustomRunConfig):
>>>         return torch.utils.data.DataLoader(<val_dataset>, batch_size=32, shuffle=False)
  • After gathering all configurations and model wrapper, we can initialize and launch the parallel trainer:

>>> wrapper = MyModelWrapper(
...     model_class=<your_ModelModule_class>,
... )
>>> ablator = ParallelTrainer(
...     wrapper=wrapper,
...     run_config=parallel_config,
... )
>>> ablator.launch(working_directory = os.getcwd(), ray_head_address="auto")
Attributes:
run_configParallelConfig

Running configuration for parallel training.

devicestr

The device to use for training.

experiment_dirPath

The directory that stores experiment information (optuna storage, experiment state database).

loggerRemoteFileLogger

A centralized logger that writes messages to a file and prints them to the console.

experiment_stateExperimentState

This attribute manages optuna trials.

total_trialsint

Number of trials to run.

gpu_mem_bottleneckint

The minimum memory capacity of all available gpus.

cpufloat

The number of cpu used per trial.

gpufloat

The number of gpu used per trial.

launch(working_directory: str, auxilary_modules: list[module] | None = None, ray_head_address: str | None = None, resume: bool = False, excluding_files: list[str] | None = None)[source]#

Set up and launch the parallel ablation process. This sets up a ray cluster, and trials of different hyperparameters initialized (or retrieved) will be pushed to ray nodes so they can be executed in parallel.

Parameters:
working_directorystr

The working directory that stores codes, modules that will be used by ray.

auxilary_moduleslist[tys.ModuleType], None

A list of modules to be used as ray clusters’ working environment.

ray_head_addressstr, None

Ray cluster address.

resumebool, default=False

Whether to resume training the model from existing checkpoints and existing experiment state.

excluding_files: list[str], None

A list of files in .gitignore format, that will be excluded from being uploaded to the ray cluster. If unspecified it ignores .git/** folder.

pre_train_setup()[source]#

Used to prepare resources to avoid stalling during training or when resources are shared between trainers.

property total_trials#
ablator.main.mp.train_main_remote(model: ~ablator.main.model.wrapper.ModelWrapper, run_config: ~ablator.config.mp.ParallelConfig, mp_logger: ~ablator.modules.loggers.file.RemoteFileLogger, uid: int, fault_tollerant: bool = True, crash_exceptions_types: list[type] | None = None, resume: bool = False, clean_reset: bool = False, progress_bar: <ablator.utils.progress_bar.ActorClass(RemoteProgressBar) object at 0x7f32bb7ceb90> | None = None) tuple[int, dict[str, float] | None, ablator.main.state.store.TrialState][source]#

The trial job that will be executed remotely at a ray node. This is where model training happens. In addition, experiment directory will be synchronized to the Google Cloud storage and remote nodes. Synchronization is done via GcpConfig and RemoteConfig rsync_up() methods. Refer to documentation of these 2 classes for more details.

Parameters:
modelModelWrapper

The ModelWrapper that is used to train a model.

run_configParallelConfig

Runtime configuration for this trial.

mp_loggerFileLogger

The file logger that’s used to log training progress.

uidint

the trial unique identifier.

fault_tollerantbool, optional, default=True

Whether to tollerate crashes, aka to cease execution when the ray job crashes.

crash_exceptions_typeslist[type], None, optional, default=None

Types of exceptions that are considered as crashes.

resumebool, default=False

Whether to resume training the model from existing checkpoints and existing experiment state.

clean_resetbool, default=False

Whether to remove model directory when CheckpointNotFoundError is raised.

progress_barRemoteProgressBar, optional

Optionally, we can use a remote progress bar to update the results of the trial.

Returns:
int

The trial uid corresponding to the results.

dict[str, float], None

If exception raised (Except for LossDivergedError and TrainPlateauError), this will be None object. Otherwise, this will be a dictionary of metrics.

TrialState

A TrialState object indicating the state of the trial job

  • If LossDivergedError or TrainPlateauError is raised while training, returned state will be TrialState.PRUNED_POOR_PERFORMANCE

  • If RuntimeError (with message 'CUDA out of memory'), or CheckpointNotFoundError (with clean_reset=True) is raised while training, returned state will be TrialState.FAIL_RECOVERABLE

  • If other types of error or CheckpointNotFoundError (with clean_reset=False) is raised, returned state will be TrialState.FAIL

ablator.main.proto module#

class ablator.main.proto.ProtoTrainer(wrapper: ModelWrapper, run_config: RunConfig)[source]#

Bases: object

Manages resources for Prototyping. This trainer runs experiment of a single prototype model. (Therefore no HPO)

Raises:
RuntimeError

If experiment directory is not defined in the running configuration.

Examples

Below is a complete workflow on how to launch a prototype experiment with ProtoTrainer, from defining config to launching the experiment:

  • Define training config:

>>> my_optim_config = OptimizerConfig("sgd", {"lr": 0.5, "weight_decay": 0.5})
>>> my_scheduler_config = SchedulerConfig("step", arguments={"step_size": 1, "gamma": 0.99})
>>> train_config = TrainConfig(
...     dataset="[Dataset Name]",
...     batch_size=32,
...     epochs=10,
...     optimizer_config = my_optimizer_config,
...     scheduler_config = my_scheduler_config,
...     rand_weights_init = True
... )
  • Define model config, here we use default one with no custom hyperparameters (sometimes you would want to define model config when running HPO on your model’s hyperparameters in the parallel experiments with `ParallelTrainer`, which requires `ParallelConfig` instead of `RunConfig`):

>>> model_config = ModelConfig()
  • Define run config:

>>> run_config = CustomRunConfig(
...     train_config=train_config,
...     model_config=model_config,
...     metrics_n_batches = 800,
...     experiment_dir = "/tmp/experiments",
...     device="cpu",
...     amp=False,
...     random_seed = 42
... )
  • Create model wrapper:

>>> class MyModelWrapper(ModelWrapper):
>>>     def __init__(self, *args, **kwargs):
>>>         super().__init__(*args, **kwargs)
>>>
>>>     def make_dataloader_train(self, run_config: CustomRunConfig):
>>>         return torch.utils.data.DataLoader(<train_dataset>, batch_size=32, shuffle=True)
>>>
>>>     def make_dataloader_val(self, run_config: CustomRunConfig):
>>>         return torch.utils.data.DataLoader(<val_dataset>, batch_size=32, shuffle=False)
  • After gathering all configurations and model wrapper, it’s time we initialize and launch the prototype trainer:

>>> wrapper = MyModelWrapper(
...     model_class=<your_ModelModule_class>,
... )
>>> ablator = ProtoTrainer(
...     wrapper=wrapper,
...     run_config=run_config,
... )
>>> metrics = ablator.launch()
Attributes:
wrapperModelWrapper

The main model wrapper.

run_configRunConfig

Running configuration for the model.

evaluate()[source]#

Run model evaluation on the training results, sync evaluation results to external logging services (e.g Google cloud storage, other remote servers).

Returns:
metricsMetrics

Metrics returned after evaluation.

launch(debug: bool = False)[source]#

Launch the prototype experiment (train, evaluate the single prototype model) and return metrics.

Parameters:
debugbool, default=False

Whether to train model in debug mode.

Returns:
metricsMetrics

Metrics returned after training.

pre_train_setup()[source]#

Used to prepare resources to avoid stalling during training or when resources are shared between trainers.

smoke_test(config=None)[source]#

Run a smoke test training process on the model.

Parameters:
configRunConfig

Running configuration for the model.

Module contents#