Spaces:
Sleeping
Sleeping
import math | |
from torch import inf | |
from torch.optim.optimizer import Optimizer | |
class ReduceLROnPlateauWithWarmup(object): | |
"""Reduce learning rate when a metric has stopped improving. | |
Models often benefit from reducing the learning rate by a factor | |
of 2-10 once learning stagnates. This scheduler reads a metrics | |
quantity and if no improvement is seen for a 'patience' number | |
of epochs, the learning rate is reduced. | |
Args: | |
optimizer (Optimizer): Wrapped optimizer. | |
mode (str): One of `min`, `max`. In `min` mode, lr will | |
be reduced when the quantity monitored has stopped | |
decreasing; in `max` mode it will be reduced when the | |
quantity monitored has stopped increasing. Default: 'min'. | |
factor (float): Factor by which the learning rate will be | |
reduced. new_lr = lr * factor. Default: 0.1. | |
patience (int): Number of epochs with no improvement after | |
which learning rate will be reduced. For example, if | |
`patience = 2`, then we will ignore the first 2 epochs | |
with no improvement, and will only decrease the LR after the | |
3rd epoch if the loss still hasn't improved then. | |
Default: 10. | |
threshold (float): Threshold for measuring the new optimum, | |
to only focus on significant changes. Default: 1e-4. | |
threshold_mode (str): One of `rel`, `abs`. In `rel` mode, | |
dynamic_threshold = best * ( 1 + threshold ) in 'max' | |
mode or best * ( 1 - threshold ) in `min` mode. | |
In `abs` mode, dynamic_threshold = best + threshold in | |
`max` mode or best - threshold in `min` mode. Default: 'rel'. | |
cooldown (int): Number of epochs to wait before resuming | |
normal operation after lr has been reduced. Default: 0. | |
min_lr (float or list): A scalar or a list of scalars. A | |
lower bound on the learning rate of all param groups | |
or each group respectively. Default: 0. | |
eps (float): Minimal decay applied to lr. If the difference | |
between new and old lr is smaller than eps, the update is | |
ignored. Default: 1e-8. | |
verbose (bool): If ``True``, prints a message to stdout for | |
each update. Default: ``False``. | |
warmup_lr: float or None, the learning rate to be touched after warmup | |
warmup: int, the number of steps to warmup | |
""" | |
def __init__( | |
self, | |
optimizer, | |
mode="min", | |
factor=0.1, | |
patience=10, | |
threshold=1e-4, | |
threshold_mode="rel", | |
cooldown=0, | |
min_lr=0, | |
eps=1e-8, | |
verbose=False, | |
warmup_lr=None, | |
warmup=0, | |
): | |
if factor >= 1.0: | |
raise ValueError("Factor should be < 1.0.") | |
self.factor = factor | |
# Attach optimizer | |
if not isinstance(optimizer, Optimizer): | |
raise TypeError("{} is not an Optimizer".format(type(optimizer).__name__)) | |
self.optimizer = optimizer | |
if isinstance(min_lr, list) or isinstance(min_lr, tuple): | |
if len(min_lr) != len(optimizer.param_groups): | |
raise ValueError( | |
"expected {} min_lrs, got {}".format( | |
len(optimizer.param_groups), len(min_lr) | |
) | |
) | |
self.min_lrs = list(min_lr) | |
else: | |
self.min_lrs = [min_lr] * len(optimizer.param_groups) | |
self.patience = patience | |
self.verbose = verbose | |
self.cooldown = cooldown | |
self.cooldown_counter = 0 | |
self.mode = mode | |
self.threshold = threshold | |
self.threshold_mode = threshold_mode | |
self.warmup_lr = warmup_lr | |
self.warmup = warmup | |
self.best = None | |
self.num_bad_epochs = None | |
self.mode_worse = None # the worse value for the chosen mode | |
self.eps = eps | |
self.last_epoch = 0 | |
self._init_is_better( | |
mode=mode, threshold=threshold, threshold_mode=threshold_mode | |
) | |
self._reset() | |
def _prepare_for_warmup(self): | |
if self.warmup_lr is not None: | |
if isinstance(self.warmup_lr, (list, tuple)): | |
if len(self.warmup_lr) != len(self.optimizer.param_groups): | |
raise ValueError( | |
"expected {} warmup_lrs, got {}".format( | |
len(self.optimizer.param_groups), len(self.warmup_lr) | |
) | |
) | |
self.warmup_lrs = list(self.warmup_lr) | |
else: | |
self.warmup_lrs = [self.warmup_lr] * len(self.optimizer.param_groups) | |
else: | |
self.warmup_lrs = None | |
if self.warmup > self.last_epoch: | |
curr_lrs = [group["lr"] for group in self.optimizer.param_groups] | |
self.warmup_lr_steps = [ | |
max(0, (self.warmup_lrs[i] - curr_lrs[i]) / float(self.warmup)) | |
for i in range(len(curr_lrs)) | |
] | |
else: | |
self.warmup_lr_steps = None | |
def _reset(self): | |
"""Resets num_bad_epochs counter and cooldown counter.""" | |
self.best = self.mode_worse | |
self.cooldown_counter = 0 | |
self.num_bad_epochs = 0 | |
def step(self, metrics): | |
# convert `metrics` to float, in case it's a zero-dim Tensor | |
current = float(metrics) | |
epoch = self.last_epoch + 1 | |
self.last_epoch = epoch | |
if epoch <= self.warmup: | |
self._increase_lr(epoch) | |
else: | |
if self.is_better(current, self.best): | |
self.best = current | |
self.num_bad_epochs = 0 | |
else: | |
self.num_bad_epochs += 1 | |
if self.in_cooldown: | |
self.cooldown_counter -= 1 | |
self.num_bad_epochs = 0 # ignore any bad epochs in cooldown | |
if self.num_bad_epochs > self.patience: | |
self._reduce_lr(epoch) | |
self.cooldown_counter = self.cooldown | |
self.num_bad_epochs = 0 | |
self._last_lr = [group["lr"] for group in self.optimizer.param_groups] | |
def _reduce_lr(self, epoch): | |
for i, param_group in enumerate(self.optimizer.param_groups): | |
old_lr = float(param_group["lr"]) | |
new_lr = max(old_lr * self.factor, self.min_lrs[i]) | |
if old_lr - new_lr > self.eps: | |
param_group["lr"] = new_lr | |
if self.verbose: | |
print( | |
"Epoch {:5d}: reducing learning rate" | |
" of group {} to {:.4e}.".format(epoch, i, new_lr) | |
) | |
def _increase_lr(self, epoch): | |
# used for warmup | |
for i, param_group in enumerate(self.optimizer.param_groups): | |
old_lr = float(param_group["lr"]) | |
new_lr = max(old_lr + self.warmup_lr_steps[i], self.min_lrs[i]) | |
param_group["lr"] = new_lr | |
if self.verbose: | |
print( | |
"Epoch {:5d}: increasing learning rate" | |
" of group {} to {:.4e}.".format(epoch, i, new_lr) | |
) | |
def in_cooldown(self): | |
return self.cooldown_counter > 0 | |
def is_better(self, a, best): | |
if self.mode == "min" and self.threshold_mode == "rel": | |
rel_epsilon = 1.0 - self.threshold | |
return a < best * rel_epsilon | |
elif self.mode == "min" and self.threshold_mode == "abs": | |
return a < best - self.threshold | |
elif self.mode == "max" and self.threshold_mode == "rel": | |
rel_epsilon = self.threshold + 1.0 | |
return a > best * rel_epsilon | |
else: # mode == 'max' and epsilon_mode == 'abs': | |
return a > best + self.threshold | |
def _init_is_better(self, mode, threshold, threshold_mode): | |
if mode not in {"min", "max"}: | |
raise ValueError("mode " + mode + " is unknown!") | |
if threshold_mode not in {"rel", "abs"}: | |
raise ValueError("threshold mode " + threshold_mode + " is unknown!") | |
if mode == "min": | |
self.mode_worse = inf | |
else: # mode == 'max': | |
self.mode_worse = -inf | |
self.mode = mode | |
self.threshold = threshold | |
self.threshold_mode = threshold_mode | |
self._prepare_for_warmup() | |
def state_dict(self): | |
return { | |
key: value for key, value in self.__dict__.items() if key != "optimizer" | |
} | |
def load_state_dict(self, state_dict): | |
self.__dict__.update(state_dict) | |
self._init_is_better( | |
mode=self.mode, threshold=self.threshold, threshold_mode=self.threshold_mode | |
) | |
class CosineAnnealingLRWithWarmup(object): | |
""" | |
adjust lr: | |
args: | |
warmup_lr: float or None, the learning rate to be touched after warmup | |
warmup: int, the number of steps to warmup | |
""" | |
def __init__( | |
self, | |
optimizer, | |
T_max, | |
last_epoch=-1, | |
verbose=False, | |
min_lr=0, | |
warmup_lr=None, | |
warmup=0, | |
): | |
self.optimizer = optimizer | |
self.T_max = T_max | |
self.last_epoch = last_epoch | |
self.verbose = verbose | |
self.warmup_lr = warmup_lr | |
self.warmup = warmup | |
if isinstance(min_lr, list) or isinstance(min_lr, tuple): | |
if len(min_lr) != len(optimizer.param_groups): | |
raise ValueError( | |
"expected {} min_lrs, got {}".format( | |
len(optimizer.param_groups), len(min_lr) | |
) | |
) | |
self.min_lrs = list(min_lr) | |
else: | |
self.min_lrs = [min_lr] * len(optimizer.param_groups) | |
self.max_lrs = [lr for lr in self.min_lrs] | |
self._prepare_for_warmup() | |
def step(self): | |
epoch = self.last_epoch + 1 | |
self.last_epoch = epoch | |
if epoch <= self.warmup: | |
self._increase_lr(epoch) | |
else: | |
self._reduce_lr(epoch) | |
def _reduce_lr(self, epoch): | |
for i, param_group in enumerate(self.optimizer.param_groups): | |
progress = float(epoch - self.warmup) / float( | |
max(1, self.T_max - self.warmup) | |
) | |
factor = max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress))) | |
old_lr = float(param_group["lr"]) | |
new_lr = max(self.max_lrs[i] * factor, self.min_lrs[i]) | |
param_group["lr"] = new_lr | |
if self.verbose: | |
print( | |
"Epoch {:5d}: reducing learning rate" | |
" of group {} to {:.4e}.".format(epoch, i, new_lr) | |
) | |
def _increase_lr(self, epoch): | |
# used for warmup | |
for i, param_group in enumerate(self.optimizer.param_groups): | |
old_lr = float(param_group["lr"]) | |
new_lr = old_lr + self.warmup_lr_steps[i] | |
param_group["lr"] = new_lr | |
self.max_lrs[i] = max(self.max_lrs[i], new_lr) | |
if self.verbose: | |
print( | |
"Epoch {:5d}: increasing learning rate" | |
" of group {} to {:.4e}.".format(epoch, i, new_lr) | |
) | |
def _prepare_for_warmup(self): | |
if self.warmup_lr is not None: | |
if isinstance(self.warmup_lr, (list, tuple)): | |
if len(self.warmup_lr) != len(self.optimizer.param_groups): | |
raise ValueError( | |
"expected {} warmup_lrs, got {}".format( | |
len(self.optimizer.param_groups), len(self.warmup_lr) | |
) | |
) | |
self.warmup_lrs = list(self.warmup_lr) | |
else: | |
self.warmup_lrs = [self.warmup_lr] * len(self.optimizer.param_groups) | |
else: | |
self.warmup_lrs = None | |
if self.warmup > self.last_epoch: | |
curr_lrs = [group["lr"] for group in self.optimizer.param_groups] | |
self.warmup_lr_steps = [ | |
max(0, (self.warmup_lrs[i] - curr_lrs[i]) / float(self.warmup)) | |
for i in range(len(curr_lrs)) | |
] | |
else: | |
self.warmup_lr_steps = None | |
def state_dict(self): | |
return { | |
key: value for key, value in self.__dict__.items() if key != "optimizer" | |
} | |
def load_state_dict(self, state_dict): | |
self.__dict__.update(state_dict) | |
self._prepare_for_warmup() | |