import math from torch import inf from torch.optim.optimizer import Optimizer class ReduceLROnPlateauWithWarmup(object): """Reduce learning rate when a metric has stopped improving. Models often benefit from reducing the learning rate by a factor of 2-10 once learning stagnates. This scheduler reads a metrics quantity and if no improvement is seen for a 'patience' number of epochs, the learning rate is reduced. Args: optimizer (Optimizer): Wrapped optimizer. mode (str): One of `min`, `max`. In `min` mode, lr will be reduced when the quantity monitored has stopped decreasing; in `max` mode it will be reduced when the quantity monitored has stopped increasing. Default: 'min'. factor (float): Factor by which the learning rate will be reduced. new_lr = lr * factor. Default: 0.1. patience (int): Number of epochs with no improvement after which learning rate will be reduced. For example, if `patience = 2`, then we will ignore the first 2 epochs with no improvement, and will only decrease the LR after the 3rd epoch if the loss still hasn't improved then. Default: 10. threshold (float): Threshold for measuring the new optimum, to only focus on significant changes. Default: 1e-4. threshold_mode (str): One of `rel`, `abs`. In `rel` mode, dynamic_threshold = best * ( 1 + threshold ) in 'max' mode or best * ( 1 - threshold ) in `min` mode. In `abs` mode, dynamic_threshold = best + threshold in `max` mode or best - threshold in `min` mode. Default: 'rel'. cooldown (int): Number of epochs to wait before resuming normal operation after lr has been reduced. Default: 0. min_lr (float or list): A scalar or a list of scalars. A lower bound on the learning rate of all param groups or each group respectively. Default: 0. eps (float): Minimal decay applied to lr. If the difference between new and old lr is smaller than eps, the update is ignored. Default: 1e-8. verbose (bool): If ``True``, prints a message to stdout for each update. Default: ``False``. warmup_lr: float or None, the learning rate to be touched after warmup warmup: int, the number of steps to warmup """ def __init__( self, optimizer, mode="min", factor=0.1, patience=10, threshold=1e-4, threshold_mode="rel", cooldown=0, min_lr=0, eps=1e-8, verbose=False, warmup_lr=None, warmup=0, ): if factor >= 1.0: raise ValueError("Factor should be < 1.0.") self.factor = factor # Attach optimizer if not isinstance(optimizer, Optimizer): raise TypeError("{} is not an Optimizer".format(type(optimizer).__name__)) self.optimizer = optimizer if isinstance(min_lr, list) or isinstance(min_lr, tuple): if len(min_lr) != len(optimizer.param_groups): raise ValueError( "expected {} min_lrs, got {}".format( len(optimizer.param_groups), len(min_lr) ) ) self.min_lrs = list(min_lr) else: self.min_lrs = [min_lr] * len(optimizer.param_groups) self.patience = patience self.verbose = verbose self.cooldown = cooldown self.cooldown_counter = 0 self.mode = mode self.threshold = threshold self.threshold_mode = threshold_mode self.warmup_lr = warmup_lr self.warmup = warmup self.best = None self.num_bad_epochs = None self.mode_worse = None # the worse value for the chosen mode self.eps = eps self.last_epoch = 0 self._init_is_better( mode=mode, threshold=threshold, threshold_mode=threshold_mode ) self._reset() def _prepare_for_warmup(self): if self.warmup_lr is not None: if isinstance(self.warmup_lr, (list, tuple)): if len(self.warmup_lr) != len(self.optimizer.param_groups): raise ValueError( "expected {} warmup_lrs, got {}".format( len(self.optimizer.param_groups), len(self.warmup_lr) ) ) self.warmup_lrs = list(self.warmup_lr) else: self.warmup_lrs = [self.warmup_lr] * len(self.optimizer.param_groups) else: self.warmup_lrs = None if self.warmup > self.last_epoch: curr_lrs = [group["lr"] for group in self.optimizer.param_groups] self.warmup_lr_steps = [ max(0, (self.warmup_lrs[i] - curr_lrs[i]) / float(self.warmup)) for i in range(len(curr_lrs)) ] else: self.warmup_lr_steps = None def _reset(self): """Resets num_bad_epochs counter and cooldown counter.""" self.best = self.mode_worse self.cooldown_counter = 0 self.num_bad_epochs = 0 def step(self, metrics): # convert `metrics` to float, in case it's a zero-dim Tensor current = float(metrics) epoch = self.last_epoch + 1 self.last_epoch = epoch if epoch <= self.warmup: self._increase_lr(epoch) else: if self.is_better(current, self.best): self.best = current self.num_bad_epochs = 0 else: self.num_bad_epochs += 1 if self.in_cooldown: self.cooldown_counter -= 1 self.num_bad_epochs = 0 # ignore any bad epochs in cooldown if self.num_bad_epochs > self.patience: self._reduce_lr(epoch) self.cooldown_counter = self.cooldown self.num_bad_epochs = 0 self._last_lr = [group["lr"] for group in self.optimizer.param_groups] def _reduce_lr(self, epoch): for i, param_group in enumerate(self.optimizer.param_groups): old_lr = float(param_group["lr"]) new_lr = max(old_lr * self.factor, self.min_lrs[i]) if old_lr - new_lr > self.eps: param_group["lr"] = new_lr if self.verbose: print( "Epoch {:5d}: reducing learning rate" " of group {} to {:.4e}.".format(epoch, i, new_lr) ) def _increase_lr(self, epoch): # used for warmup for i, param_group in enumerate(self.optimizer.param_groups): old_lr = float(param_group["lr"]) new_lr = max(old_lr + self.warmup_lr_steps[i], self.min_lrs[i]) param_group["lr"] = new_lr if self.verbose: print( "Epoch {:5d}: increasing learning rate" " of group {} to {:.4e}.".format(epoch, i, new_lr) ) @property def in_cooldown(self): return self.cooldown_counter > 0 def is_better(self, a, best): if self.mode == "min" and self.threshold_mode == "rel": rel_epsilon = 1.0 - self.threshold return a < best * rel_epsilon elif self.mode == "min" and self.threshold_mode == "abs": return a < best - self.threshold elif self.mode == "max" and self.threshold_mode == "rel": rel_epsilon = self.threshold + 1.0 return a > best * rel_epsilon else: # mode == 'max' and epsilon_mode == 'abs': return a > best + self.threshold def _init_is_better(self, mode, threshold, threshold_mode): if mode not in {"min", "max"}: raise ValueError("mode " + mode + " is unknown!") if threshold_mode not in {"rel", "abs"}: raise ValueError("threshold mode " + threshold_mode + " is unknown!") if mode == "min": self.mode_worse = inf else: # mode == 'max': self.mode_worse = -inf self.mode = mode self.threshold = threshold self.threshold_mode = threshold_mode self._prepare_for_warmup() def state_dict(self): return { key: value for key, value in self.__dict__.items() if key != "optimizer" } def load_state_dict(self, state_dict): self.__dict__.update(state_dict) self._init_is_better( mode=self.mode, threshold=self.threshold, threshold_mode=self.threshold_mode ) class CosineAnnealingLRWithWarmup(object): """ adjust lr: args: warmup_lr: float or None, the learning rate to be touched after warmup warmup: int, the number of steps to warmup """ def __init__( self, optimizer, T_max, last_epoch=-1, verbose=False, min_lr=0, warmup_lr=None, warmup=0, ): self.optimizer = optimizer self.T_max = T_max self.last_epoch = last_epoch self.verbose = verbose self.warmup_lr = warmup_lr self.warmup = warmup if isinstance(min_lr, list) or isinstance(min_lr, tuple): if len(min_lr) != len(optimizer.param_groups): raise ValueError( "expected {} min_lrs, got {}".format( len(optimizer.param_groups), len(min_lr) ) ) self.min_lrs = list(min_lr) else: self.min_lrs = [min_lr] * len(optimizer.param_groups) self.max_lrs = [lr for lr in self.min_lrs] self._prepare_for_warmup() def step(self): epoch = self.last_epoch + 1 self.last_epoch = epoch if epoch <= self.warmup: self._increase_lr(epoch) else: self._reduce_lr(epoch) def _reduce_lr(self, epoch): for i, param_group in enumerate(self.optimizer.param_groups): progress = float(epoch - self.warmup) / float( max(1, self.T_max - self.warmup) ) factor = max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress))) old_lr = float(param_group["lr"]) new_lr = max(self.max_lrs[i] * factor, self.min_lrs[i]) param_group["lr"] = new_lr if self.verbose: print( "Epoch {:5d}: reducing learning rate" " of group {} to {:.4e}.".format(epoch, i, new_lr) ) def _increase_lr(self, epoch): # used for warmup for i, param_group in enumerate(self.optimizer.param_groups): old_lr = float(param_group["lr"]) new_lr = old_lr + self.warmup_lr_steps[i] param_group["lr"] = new_lr self.max_lrs[i] = max(self.max_lrs[i], new_lr) if self.verbose: print( "Epoch {:5d}: increasing learning rate" " of group {} to {:.4e}.".format(epoch, i, new_lr) ) def _prepare_for_warmup(self): if self.warmup_lr is not None: if isinstance(self.warmup_lr, (list, tuple)): if len(self.warmup_lr) != len(self.optimizer.param_groups): raise ValueError( "expected {} warmup_lrs, got {}".format( len(self.optimizer.param_groups), len(self.warmup_lr) ) ) self.warmup_lrs = list(self.warmup_lr) else: self.warmup_lrs = [self.warmup_lr] * len(self.optimizer.param_groups) else: self.warmup_lrs = None if self.warmup > self.last_epoch: curr_lrs = [group["lr"] for group in self.optimizer.param_groups] self.warmup_lr_steps = [ max(0, (self.warmup_lrs[i] - curr_lrs[i]) / float(self.warmup)) for i in range(len(curr_lrs)) ] else: self.warmup_lr_steps = None def state_dict(self): return { key: value for key, value in self.__dict__.items() if key != "optimizer" } def load_state_dict(self, state_dict): self.__dict__.update(state_dict) self._prepare_for_warmup()