ablator.main package#
Subpackages#
- ablator.main.hpo package
- ablator.main.model package
- Submodules
- ablator.main.model.main module
CheckpointNotFoundErrorEvaluationErrorLogStepErrorModelBaseModelBase.checkpoint()ModelBase.config_parser()ModelBase.create_model()ModelBase.current_epochModelBase.epoch_lenModelBase.eval_itrModelBase.evaluate()ModelBase.evaluation_functions()ModelBase.load_checkpoint()ModelBase.log_itrModelBase.make_dataloaders()ModelBase.save_dict()ModelBase.train()ModelBase.train_statsModelBase.uid
TrainPlateauError
- ablator.main.model.wrapper module
ModelWrapperModelWrapper.apply_loss()ModelWrapper.checkpoint()ModelWrapper.config_parser()ModelWrapper.create_model()ModelWrapper.create_optimizer()ModelWrapper.create_scaler()ModelWrapper.create_scheduler()ModelWrapper.epochsModelWrapper.eval()ModelWrapper.evaluate()ModelWrapper.evaluation_functions()ModelWrapper.load_checkpoint()ModelWrapper.log()ModelWrapper.log_step()ModelWrapper.make_dataloader_test()ModelWrapper.make_dataloader_train()ModelWrapper.make_dataloader_val()ModelWrapper.make_dataloaders()ModelWrapper.metricsModelWrapper.mock_train()ModelWrapper.model_configModelWrapper.model_step()ModelWrapper.reset_optimizer_scheduler()ModelWrapper.save_dict()ModelWrapper.status_message()ModelWrapper.to_device()ModelWrapper.total_stepsModelWrapper.train()ModelWrapper.train_configModelWrapper.train_loop()ModelWrapper.train_step()ModelWrapper.update_status()ModelWrapper.validation_loop()
- Module contents
- ablator.main.state package
Submodules#
ablator.main.mp module#
- class ablator.main.mp.ParallelTrainer(wrapper: ModelWrapper, run_config: ParallelConfig)[source]#
Bases:
ProtoTrainerA class for parallelizing training and hyperparameter optimization of models of different configurations with ray.
Examples
Below is a complete workflow on how to launch a parallel experiment with
ParallelTrainer, from defining config, getting the model wrapper ready, to launching the experiment:Define training config:
>>> my_optim_config = OptimizerConfig("sgd", {"lr": 0.5, "weight_decay": 0.5}) >>> my_scheduler_config = SchedulerConfig("step", arguments={"step_size": 1, "gamma": 0.99}) >>> train_config = TrainConfig( ... dataset="[Dataset Name]", ... batch_size=32, ... epochs=10, ... optimizer_config = my_optimizer_config, ... scheduler_config = my_scheduler_config, ... rand_weights_init = True ... )
Define model config, we want to run HPO on activation functions and model hidden size:
>>> @configclass >>> class CustomModelConfig(ModelConfig): >>> hidden_size: int >>> activation: str >>> model_config = CustomModelConfig(num_filter1 =32, num_filter2 = 64, activation = "relu")
Define search space:
>>> search_space = { ... "train_config.optimizer_config.arguments.lr": SearchSpace( ... value_range = [0.001, 0.01], ... value_type = 'float' ... ), ... "model_config.hidden_size": SearchSpace(value_range = [32, 64], value_type = 'int'), ... "model_config.activation": SearchSpace(categorical_values = ["relu", "elu", "leakyRelu"]), ... }
Define run config (remember to redefine the parallel config to update the model config type to be
CustomModelConfig):
>>> @configclass >>> class CustomParallelConfig(ParallelConfig): ... model_config: CustomModelConfig >>> >>> parallel_config = CustomParallelConfig( ... train_config=train_config, ... model_config=model_config, ... metrics_n_batches = 800, ... experiment_dir = "/tmp/experiments/", ... device="cuda", ... amp=True, ... random_seed = 42, ... total_trials = 20, ... concurrent_trials = 20, ... search_space = search_space, ... optim_metrics = {"val_loss": "min"}, ... gpu_mb_per_experiment = 1024, ... cpus_per_experiment = 1, ... )
Create model wrapper:
>>> class MyModelWrapper(ModelWrapper): >>> def __init__(self, *args, **kwargs): >>> super().__init__(*args, **kwargs) >>> >>> def make_dataloader_train(self, run_config: CustomRunConfig): >>> return torch.utils.data.DataLoader(<train_dataset>, batch_size=32, shuffle=True) >>> >>> def make_dataloader_val(self, run_config: CustomRunConfig): >>> return torch.utils.data.DataLoader(<val_dataset>, batch_size=32, shuffle=False)
After gathering all configurations and model wrapper, we can initialize and launch the parallel trainer:
>>> wrapper = MyModelWrapper( ... model_class=<your_ModelModule_class>, ... ) >>> ablator = ParallelTrainer( ... wrapper=wrapper, ... run_config=parallel_config, ... ) >>> ablator.launch(working_directory = os.getcwd(), ray_head_address="auto")
- Attributes:
- run_configParallelConfig
Running configuration for parallel training.
- devicestr
The device to use for training.
- experiment_dirPath
The directory that stores experiment information (optuna storage, experiment state database).
- loggerRemoteFileLogger
A centralized logger that writes messages to a file and prints them to the console.
- experiment_stateExperimentState
This attribute manages optuna trials.
- total_trialsint
Number of trials to run.
- gpu_mem_bottleneckint
The minimum memory capacity of all available gpus.
- cpufloat
The number of cpu used per trial.
- gpufloat
The number of gpu used per trial.
- launch(working_directory: str, auxilary_modules: list[module] | None = None, ray_head_address: str | None = None, resume: bool = False, excluding_files: list[str] | None = None)[source]#
Set up and launch the parallel ablation process. This sets up a ray cluster, and trials of different hyperparameters initialized (or retrieved) will be pushed to ray nodes so they can be executed in parallel.
- Parameters:
- working_directorystr
The working directory that stores codes, modules that will be used by ray.
- auxilary_moduleslist[tys.ModuleType], None
A list of modules to be used as ray clusters’ working environment.
- ray_head_addressstr, None
Ray cluster address.
- resumebool, default=False
Whether to resume training the model from existing checkpoints and existing experiment state.
- excluding_files: list[str], None
A list of files in .gitignore format, that will be excluded from being uploaded to the ray cluster. If unspecified it ignores .git/** folder.
- pre_train_setup()[source]#
Used to prepare resources to avoid stalling during training or when resources are shared between trainers.
- property total_trials#
- ablator.main.mp.train_main_remote(model: ~ablator.main.model.wrapper.ModelWrapper, run_config: ~ablator.config.mp.ParallelConfig, mp_logger: ~ablator.modules.loggers.file.RemoteFileLogger, uid: int, fault_tollerant: bool = True, crash_exceptions_types: list[type] | None = None, resume: bool = False, clean_reset: bool = False, progress_bar: <ablator.utils.progress_bar.ActorClass(RemoteProgressBar) object at 0x7f32bb7ceb90> | None = None) tuple[int, dict[str, float] | None, ablator.main.state.store.TrialState][source]#
The trial job that will be executed remotely at a ray node. This is where model training happens. In addition, experiment directory will be synchronized to the Google Cloud storage and remote nodes. Synchronization is done via GcpConfig and RemoteConfig
rsync_up()methods. Refer to documentation of these 2 classes for more details.- Parameters:
- modelModelWrapper
The ModelWrapper that is used to train a model.
- run_configParallelConfig
Runtime configuration for this trial.
- mp_loggerFileLogger
The file logger that’s used to log training progress.
- uidint
the trial unique identifier.
- fault_tollerantbool, optional, default=True
Whether to tollerate crashes, aka to cease execution when the ray job crashes.
- crash_exceptions_typeslist[type], None, optional, default=None
Types of exceptions that are considered as crashes.
- resumebool, default=False
Whether to resume training the model from existing checkpoints and existing experiment state.
- clean_resetbool, default=False
Whether to remove model directory when
CheckpointNotFoundErroris raised.- progress_barRemoteProgressBar, optional
Optionally, we can use a remote progress bar to update the results of the trial.
- Returns:
- int
The trial uid corresponding to the results.
- dict[str, float], None
If exception raised (Except for LossDivergedError and TrainPlateauError), this will be
Noneobject. Otherwise, this will be a dictionary of metrics.- TrialState
A TrialState object indicating the state of the trial job
If
LossDivergedErrororTrainPlateauErroris raised while training, returned state will beTrialState.PRUNED_POOR_PERFORMANCEIf
RuntimeError(with message'CUDA out of memory'), orCheckpointNotFoundError(withclean_reset=True) is raised while training, returned state will beTrialState.FAIL_RECOVERABLEIf other types of error or
CheckpointNotFoundError(withclean_reset=False) is raised, returned state will beTrialState.FAIL
ablator.main.proto module#
- class ablator.main.proto.ProtoTrainer(wrapper: ModelWrapper, run_config: RunConfig)[source]#
Bases:
objectManages resources for Prototyping. This trainer runs experiment of a single prototype model. (Therefore no HPO)
- Raises:
- RuntimeError
If experiment directory is not defined in the running configuration.
Examples
Below is a complete workflow on how to launch a prototype experiment with
ProtoTrainer, from defining config to launching the experiment:Define training config:
>>> my_optim_config = OptimizerConfig("sgd", {"lr": 0.5, "weight_decay": 0.5}) >>> my_scheduler_config = SchedulerConfig("step", arguments={"step_size": 1, "gamma": 0.99}) >>> train_config = TrainConfig( ... dataset="[Dataset Name]", ... batch_size=32, ... epochs=10, ... optimizer_config = my_optimizer_config, ... scheduler_config = my_scheduler_config, ... rand_weights_init = True ... )
Define model config, here we use default one with no custom hyperparameters (sometimes you would want to define model config when running HPO on your model’s hyperparameters in the parallel experiments with
`ParallelTrainer`, which requires`ParallelConfig`instead of`RunConfig`):
>>> model_config = ModelConfig()
Define run config:
>>> run_config = CustomRunConfig( ... train_config=train_config, ... model_config=model_config, ... metrics_n_batches = 800, ... experiment_dir = "/tmp/experiments", ... device="cpu", ... amp=False, ... random_seed = 42 ... )
Create model wrapper:
>>> class MyModelWrapper(ModelWrapper): >>> def __init__(self, *args, **kwargs): >>> super().__init__(*args, **kwargs) >>> >>> def make_dataloader_train(self, run_config: CustomRunConfig): >>> return torch.utils.data.DataLoader(<train_dataset>, batch_size=32, shuffle=True) >>> >>> def make_dataloader_val(self, run_config: CustomRunConfig): >>> return torch.utils.data.DataLoader(<val_dataset>, batch_size=32, shuffle=False)
After gathering all configurations and model wrapper, it’s time we initialize and launch the prototype trainer:
>>> wrapper = MyModelWrapper( ... model_class=<your_ModelModule_class>, ... ) >>> ablator = ProtoTrainer( ... wrapper=wrapper, ... run_config=run_config, ... ) >>> metrics = ablator.launch()
- Attributes:
- wrapperModelWrapper
The main model wrapper.
- run_configRunConfig
Running configuration for the model.
- evaluate()[source]#
Run model evaluation on the training results, sync evaluation results to external logging services (e.g Google cloud storage, other remote servers).
- Returns:
- metricsMetrics
Metrics returned after evaluation.
- launch(debug: bool = False)[source]#
Launch the prototype experiment (train, evaluate the single prototype model) and return metrics.
- Parameters:
- debugbool, default=False
Whether to train model in debug mode.
- Returns:
- metricsMetrics
Metrics returned after training.