""" Adapted from https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py Credit to @leo19941227 for remove timm dependencies here : https://github.com/s3prl/passt_hear21/blob/48a0dc1b824641ca59884ced53f5b86053fed141/hear21passt/models/helpers/vit_helpers.py """ import math import logging import warnings from copy import deepcopy import torch from torch import nn from timm.models._hub import download_cached_file # Global variables for rarely used pretrained checkpoint download progress and hash check. # Use set_pretrained_download_progress / set_pretrained_check_hash functions to toggle. _DOWNLOAD_PROGRESS = True _CHECK_HASH = False _logger = logging.getLogger(__name__) def adapt_input_conv(in_chans, conv_weight, input_conv_name="(name not given)"): conv_type = conv_weight.dtype conv_weight = ( conv_weight.float() ) # Some weights are in torch.half, ensure it's float for sum on CPU O, I, J, K = conv_weight.shape if in_chans == 1: print(f"adapt_input_conv: Converted from {I} to 1 channel") if I > 3: assert conv_weight.shape[1] % 3 == 0 # For models with space2depth stems conv_weight = conv_weight.reshape(O, I // 3, 3, J, K) conv_weight = conv_weight.sum(dim=2, keepdim=False) else: conv_weight = conv_weight.sum(dim=1, keepdim=True) elif in_chans != 3: if I != 3: # loading a model pretrained on AudioSet for the downstream-task if I == in_chans: print(f"adapt_input_conv: Loading pretrained weights for {input_conv_name}, " f"Assuming same input-conv and proj-conv configuration (1:1).") pass else: print(f"adapt_input_conv: Converted input conv {input_conv_name} weights from 3 to {in_chans} channel(s)") # NOTE this strategy should be better than random init, but there could be other combinations of # the original RGB input layer weights that'd work better for specific cases. repeat = int(math.ceil(in_chans / 3)) conv_weight = conv_weight.repeat(1, repeat, 1, 1)[:, :in_chans, :, :] conv_weight *= 3 / float(in_chans) conv_weight = conv_weight.to(conv_type) return conv_weight def load_pretrained( model, default_cfg=None, num_classes=1000, in_chans=3, filter_fn=None, strict=True, progress=False, ): """Load pretrained checkpoint Args: model (nn.Module) : PyTorch model module default_cfg (Optional[Dict]): default configuration for pretrained weights / target dataset num_classes (int): num_classes for model in_chans (int): in_chans for model filter_fn (Optional[Callable]): state_dict filter fn for load (takes state_dict, model as args) strict (bool): strict load of checkpoint progress (bool): enable progress bar for weight download """ default_cfg = default_cfg or getattr(model, "default_cfg", None) or {} pretrained_url = default_cfg.get("url", None) if not pretrained_url: _logger.warning( "No pretrained weights exist for this model. Using random initialization." ) return _logger.info(f"Loading pretrained weights from url ({pretrained_url})") pretrained_loc = download_cached_file( pretrained_url, check_hash=_CHECK_HASH, progress=_DOWNLOAD_PROGRESS, ) state_dict = torch.load(pretrained_loc, map_location="cpu") if filter_fn is not None: # for backwards compat with filter fn that take one arg, try one first, the two try: state_dict = filter_fn(state_dict) except TypeError: state_dict = filter_fn(state_dict, model) input_convs = default_cfg.get("first_conv", None) if input_convs is not None and in_chans != 3: if isinstance(input_convs, str): input_convs = (input_convs,) for input_conv_name in input_convs: weight_name = input_conv_name + ".weight" try: state_dict[weight_name] = adapt_input_conv( in_chans, state_dict[weight_name], input_conv_name ) # _logger.info( # f"Converted input conv {input_conv_name} pretrained weights from 3 to {in_chans} channel(s)" # ) except (NotImplementedError, KeyError) as e: if weight_name in state_dict: del state_dict[weight_name] strict = False _logger.warning( f"Unable to convert pretrained {input_conv_name} weights, using random init for this layer." ) classifiers = default_cfg.get("classifier", None) label_offset = default_cfg.get("label_offset", 0) if classifiers is not None: if isinstance(classifiers, str): classifiers = (classifiers,) if num_classes != default_cfg["num_classes"]: for classifier_name in classifiers: # completely discard fully connected if model num_classes doesn't match pretrained weights del state_dict[classifier_name + ".weight"] del state_dict[classifier_name + ".bias"] strict = False elif label_offset > 0: for classifier_name in classifiers: # special case for pretrained weights with an extra background class in pretrained weights classifier_weight = state_dict[classifier_name + ".weight"] state_dict[classifier_name + ".weight"] = classifier_weight[ label_offset: ] classifier_bias = state_dict[classifier_name + ".bias"] state_dict[classifier_name + ".bias"] = classifier_bias[label_offset:] model.load_state_dict(state_dict, strict=strict) def overlay_external_default_cfg(default_cfg, kwargs): """Overlay 'external_default_cfg' in kwargs on top of default_cfg arg.""" external_default_cfg = kwargs.pop("external_default_cfg", None) if external_default_cfg: default_cfg.pop("url", None) # url should come from external cfg default_cfg.pop("hf_hub", None) # hf hub id should come from external cfg default_cfg.update(external_default_cfg) def filter_kwargs(kwargs, names): if not kwargs or not names: return for n in names: kwargs.pop(n, None) def set_default_kwargs(kwargs, names, default_cfg): for n in names: # for legacy reasons, model __init__args uses img_size + in_chans as separate args while # default_cfg has one input_size=(C, H ,W) entry if n == "img_size": input_size = default_cfg.get("input_size", None) if input_size is not None: assert len(input_size) == 3 kwargs.setdefault(n, input_size[-2:]) elif n == "in_chans": input_size = default_cfg.get("input_size", None) if input_size is not None: assert len(input_size) == 3 kwargs.setdefault(n, input_size[0]) else: default_val = default_cfg.get(n, None) if default_val is not None: kwargs.setdefault(n, default_cfg[n]) def update_default_cfg_and_kwargs(default_cfg, kwargs, kwargs_filter): """Update the default_cfg and kwargs before passing to model FIXME this sequence of overlay default_cfg, set default kwargs, filter kwargs could/should be replaced by an improved configuration mechanism Args: default_cfg: input default_cfg (updated in-place) kwargs: keyword args passed to model build fn (updated in-place) kwargs_filter: keyword arg keys that must be removed before model __init__ """ # Overlay default cfg values from `external_default_cfg` if it exists in kwargs overlay_external_default_cfg(default_cfg, kwargs) # Set model __init__ args that can be determined by default_cfg (if not already passed as kwargs) default_kwarg_names = ("num_classes", "global_pool", "in_chans") if default_cfg.get("fixed_input_size", False): # if fixed_input_size exists and is True, model takes an img_size arg that fixes its input size default_kwarg_names += ("img_size",) set_default_kwargs(kwargs, names=default_kwarg_names, default_cfg=default_cfg) # Filter keyword args for task specific model variants (some 'features only' models, etc.) filter_kwargs(kwargs, names=kwargs_filter) def drop_path(x, drop_prob: float = 0.0, training: bool = False): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use 'survival rate' as the argument. """ if drop_prob == 0.0 or not training: return x keep_prob = 1 - drop_prob shape = (x.shape[0],) + (1,) * ( x.ndim - 1 ) # work with diff dim tensors, not just 2D ConvNets random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device) random_tensor.floor_() # binarize output = x.div(keep_prob) * random_tensor return output class DropPath(nn.Module): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).""" def __init__(self, drop_prob=None): super(DropPath, self).__init__() self.drop_prob = drop_prob def forward(self, x): return drop_path(x, self.drop_prob, self.training) from torch.nn.init import _calculate_fan_in_and_fan_out def _no_grad_trunc_normal_(tensor, mean, std, a, b): # Cut & paste from PyTorch official master until it's in a few official releases - RW # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf def norm_cdf(x): # Computes standard normal cumulative distribution function return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0 if (mean < a - 2 * std) or (mean > b + 2 * std): warnings.warn( "mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " "The distribution of values may be incorrect.", stacklevel=2, ) with torch.no_grad(): # Values are generated by using a truncated uniform distribution and # then using the inverse CDF for the normal distribution. # Get upper and lower cdf values l = norm_cdf((a - mean) / std) u = norm_cdf((b - mean) / std) # Uniformly fill tensor with values from [l, u], then translate to # [2l-1, 2u-1]. tensor.uniform_(2 * l - 1, 2 * u - 1) # Use inverse cdf transform for normal distribution to get truncated # standard normal tensor.erfinv_() # Transform to proper mean, std tensor.mul_(std * math.sqrt(2.0)) tensor.add_(mean) # Clamp to ensure it's in the proper range tensor.clamp_(min=a, max=b) return tensor def trunc_normal_(tensor, mean=0.0, std=1.0, a=-2.0, b=2.0): r"""Fills the input Tensor with values drawn from a truncated normal distribution. The values are effectively drawn from the normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)` with values outside :math:`[a, b]` redrawn until they are within the bounds. The method used for generating the random values works best when :math:`a \leq \text{mean} \leq b`. Args: tensor: an n-dimensional `torch.Tensor` mean: the mean of the normal distribution std: the standard deviation of the normal distribution a: the minimum cutoff value b: the maximum cutoff value Examples: >>> w = torch.empty(3, 5) >>> nn.init.trunc_normal_(w) """ return _no_grad_trunc_normal_(tensor, mean, std, a, b) def variance_scaling_(tensor, scale=1.0, mode="fan_in", distribution="normal"): fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor) if mode == "fan_in": denom = fan_in elif mode == "fan_out": denom = fan_out elif mode == "fan_avg": denom = (fan_in + fan_out) / 2 variance = scale / denom if distribution == "truncated_normal": # constant is stddev of standard normal truncated to (-2, 2) trunc_normal_(tensor, std=math.sqrt(variance) / 0.87962566103423978) elif distribution == "normal": tensor.normal_(std=math.sqrt(variance)) elif distribution == "uniform": bound = math.sqrt(3 * variance) tensor.uniform_(-bound, bound) else: raise ValueError(f"invalid distribution {distribution}") def lecun_normal_(tensor): variance_scaling_(tensor, mode="fan_in", distribution="truncated_normal") def build_model_with_cfg( model_cls, variant: str, pretrained: bool, default_cfg: dict, model_cfg=None, feature_cfg=None, pretrained_strict: bool = True, pretrained_filter_fn=None, pretrained_custom_load=False, kwargs_filter=None, **kwargs, ): """Build model with specified default_cfg and optional model_cfg This helper fn aids in the construction of a model including: * handling default_cfg and associated pretained weight loading * passing through optional model_cfg for models with config based arch spec * features_only model adaptation * pruning config / model adaptation Args: model_cls (nn.Module): model class variant (str): model variant name pretrained (bool): load pretrained weights default_cfg (dict): model's default pretrained/task config model_cfg (Optional[Dict]): model's architecture config feature_cfg (Optional[Dict]: feature extraction adapter config pretrained_strict (bool): load pretrained weights strictly pretrained_filter_fn (Optional[Callable]): filter callable for pretrained weights pretrained_custom_load (bool): use custom load fn, to load numpy or other non PyTorch weights kwargs_filter (Optional[Tuple]): kwargs to filter before passing to model **kwargs: model args passed through to model __init__ """ pruned = kwargs.pop("pruned", False) features = False feature_cfg = feature_cfg or {} default_cfg = deepcopy(default_cfg) if default_cfg else {} update_default_cfg_and_kwargs(default_cfg, kwargs, kwargs_filter) default_cfg.setdefault("architecture", variant) # Setup for feature extraction wrapper done at end of this fn if kwargs.pop("features_only", False): features = True feature_cfg.setdefault("out_indices", (0, 1, 2, 3, 4)) if "out_indices" in kwargs: feature_cfg["out_indices"] = kwargs.pop("out_indices") # Build the model model = ( model_cls(**kwargs) if model_cfg is None else model_cls(cfg=model_cfg, **kwargs) ) model.default_cfg = default_cfg # For classification models, check class attr, then kwargs, then default to 1k, otherwise 0 for feats num_classes_pretrained = ( 0 if features else getattr(model, "num_classes", kwargs.get("num_classes", 1000)) ) if pretrained: assert not pretrained_custom_load, "URL should not contain npz for PASST models" load_pretrained( model, num_classes=num_classes_pretrained, in_chans=kwargs.get("in_chans", 3), filter_fn=pretrained_filter_fn, strict=pretrained_strict, ) return model