# Copyright 2022 Xiaomi Corp. (authors: Daniel Povey) # # See ../LICENSE for clarification regarding multiple authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from typing import List, Optional, Union import torch from torch.optim import Optimizer class LRScheduler(object): """ Base-class for learning rate schedulers where the learning-rate depends on both the batch and the epoch. """ def __init__(self, optimizer: Optimizer, verbose: bool = False): # Attach optimizer if not isinstance(optimizer, Optimizer): raise TypeError("{} is not an Optimizer".format(type(optimizer).__name__)) self.optimizer = optimizer self.verbose = verbose for group in optimizer.param_groups: group.setdefault("base_lr", group["lr"]) self.base_lrs = [group["base_lr"] for group in optimizer.param_groups] self.epoch = 0 self.batch = 0 def state_dict(self): """Returns the state of the scheduler as a :class:`dict`. It contains an entry for every variable in self.__dict__ which is not the optimizer. """ return { # the user might try to override the base_lr, so don't include this in the # state. previously they were included. # "base_lrs": self.base_lrs, "epoch": self.epoch, "batch": self.batch, } def load_state_dict(self, state_dict): """Loads the schedulers state. Args: state_dict (dict): scheduler state. Should be an object returned from a call to :meth:`state_dict`. """ # the things with base_lrs are a work-around for a previous problem # where base_lrs were written with the state dict. base_lrs = self.base_lrs self.__dict__.update(state_dict) self.base_lrs = base_lrs def get_last_lr(self) -> List[float]: """Return last computed learning rate by current scheduler. Will be a list of float.""" return self._last_lr def get_lr(self): # Compute list of learning rates from self.epoch and self.batch and # self.base_lrs; this must be overloaded by the user. # e.g. return [some_formula(self.batch, self.epoch, base_lr) # for base_lr in self.base_lrs ] raise NotImplementedError def step_batch(self, batch: Optional[int] = None) -> None: # Step the batch index, or just set it. If `batch` is specified, it # must be the batch index from the start of training, i.e. summed over # all epochs. # You can call this in any order; if you don't provide 'batch', it should # of course be called once per batch. if batch is not None: self.batch = batch else: self.batch = self.batch + 1 self._set_lrs() def step_epoch(self, epoch: Optional[int] = None): # Step the epoch index, or just set it. If you provide the 'epoch' arg, you # should call this at the start of the epoch; if you don't provide the 'epoch' # arg, you should call it at the end of the epoch. if epoch is not None: self.epoch = epoch else: self.epoch = self.epoch + 1 self._set_lrs() def _set_lrs(self): values = self.get_lr() assert len(values) == len(self.optimizer.param_groups) for i, data in enumerate(zip(self.optimizer.param_groups, values)): param_group, lr = data param_group["lr"] = lr self.print_lr(self.verbose, i, lr) self._last_lr = [group["lr"] for group in self.optimizer.param_groups] def print_lr(self, is_verbose, group, lr): """Display the current learning rate.""" if is_verbose: logging.warning( f"Epoch={self.epoch}, batch={self.batch}: adjusting learning rate" f" of group {group} to {lr:.4e}." ) class Eden(LRScheduler): """ Eden scheduler. The basic formula (before warmup) is: lr = base_lr * (((batch**2 + lr_batches**2) / lr_batches**2) ** -0.25 * (((epoch**2 + lr_epochs**2) / lr_epochs**2) ** -0.25)) * warmup where `warmup` increases from linearly 0.5 to 1 over `warmup_batches` batches and then stays constant at 1. If you don't have the concept of epochs, or one epoch takes a very long time, you can replace the notion of 'epoch' with some measure of the amount of data processed, e.g. hours of data or frames of data, with 'lr_epochs' being set to some measure representing "quite a lot of data": say, one fifth or one third of an entire training run, but it doesn't matter much. You could also use Eden2 which has only the notion of batches. We suggest base_lr = 0.04 (passed to optimizer) if used with ScaledAdam Args: optimizer: the optimizer to change the learning rates on lr_batches: the number of batches after which we start significantly decreasing the learning rate, suggest 5000. lr_epochs: the number of epochs after which we start significantly decreasing the learning rate, suggest 6 if you plan to do e.g. 20 to 40 epochs, but may need smaller number if dataset is huge and you will do few epochs. """ def __init__( self, optimizer: Optimizer, lr_batches: Union[int, float], lr_epochs: Union[int, float], warmup_batches: Union[int, float] = 500.0, warmup_start: float = 0.5, verbose: bool = False, ): super(Eden, self).__init__(optimizer, verbose) self.lr_batches = lr_batches self.lr_epochs = lr_epochs self.warmup_batches = warmup_batches assert 0.0 <= warmup_start <= 1.0, warmup_start self.warmup_start = warmup_start def get_lr(self): factor = ( (self.batch**2 + self.lr_batches**2) / self.lr_batches**2 ) ** -0.25 * ( ((self.epoch**2 + self.lr_epochs**2) / self.lr_epochs**2) ** -0.25 ) warmup_factor = ( 1.0 if self.batch >= self.warmup_batches else self.warmup_start + (1.0 - self.warmup_start) * (self.batch / self.warmup_batches) # else 0.5 + 0.5 * (self.batch / self.warmup_batches) ) return [x * factor * warmup_factor for x in self.base_lrs] class FixedLRScheduler(LRScheduler): """ Fixed learning rate scheduler. Args: optimizer: the optimizer to change the learning rates on """ def __init__( self, optimizer: Optimizer, verbose: bool = False, ): super(FixedLRScheduler, self).__init__(optimizer, verbose) def get_lr(self): return [x for x in self.base_lrs] def _test_eden(): m = torch.nn.Linear(100, 100) from zipvoice.utils.optim import ScaledAdam optim = ScaledAdam(m.parameters(), lr=0.03) scheduler = Eden(optim, lr_batches=100, lr_epochs=2, verbose=True) for epoch in range(10): scheduler.step_epoch(epoch) # sets epoch to `epoch` for step in range(20): x = torch.randn(200, 100).detach() x.requires_grad = True y = m(x) dy = torch.randn(200, 100).detach() f = (y * dy).sum() f.backward() optim.step() scheduler.step_batch() optim.zero_grad() logging.info(f"last lr = {scheduler.get_last_lr()}") logging.info(f"state dict = {scheduler.state_dict()}") if __name__ == "__main__": torch.set_num_threads(1) torch.set_num_interop_threads(1) logging.getLogger().setLevel(logging.INFO) import subprocess s = subprocess.check_output( "git status -uno .; git log -1; git diff HEAD .", shell=True ) logging.info(s) _test_eden()