Spaces:
Sleeping
Sleeping
""" | |
Adapted from https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py | |
Credit to @leo19941227 for remove timm dependencies here : https://github.com/s3prl/passt_hear21/blob/48a0dc1b824641ca59884ced53f5b86053fed141/hear21passt/models/helpers/vit_helpers.py | |
""" | |
import math | |
import logging | |
import warnings | |
from copy import deepcopy | |
import torch | |
from torch import nn | |
from timm.models._hub import download_cached_file | |
# Global variables for rarely used pretrained checkpoint download progress and hash check. | |
# Use set_pretrained_download_progress / set_pretrained_check_hash functions to toggle. | |
_DOWNLOAD_PROGRESS = True | |
_CHECK_HASH = False | |
_logger = logging.getLogger(__name__) | |
def adapt_input_conv(in_chans, conv_weight, input_conv_name="(name not given)"): | |
conv_type = conv_weight.dtype | |
conv_weight = ( | |
conv_weight.float() | |
) # Some weights are in torch.half, ensure it's float for sum on CPU | |
O, I, J, K = conv_weight.shape | |
if in_chans == 1: | |
print(f"adapt_input_conv: Converted from {I} to 1 channel") | |
if I > 3: | |
assert conv_weight.shape[1] % 3 == 0 | |
# For models with space2depth stems | |
conv_weight = conv_weight.reshape(O, I // 3, 3, J, K) | |
conv_weight = conv_weight.sum(dim=2, keepdim=False) | |
else: | |
conv_weight = conv_weight.sum(dim=1, keepdim=True) | |
elif in_chans != 3: | |
if I != 3: | |
# loading a model pretrained on AudioSet for the downstream-task | |
if I == in_chans: | |
print(f"adapt_input_conv: Loading pretrained weights for {input_conv_name}, " | |
f"Assuming same input-conv and proj-conv configuration (1:1).") | |
pass | |
else: | |
print(f"adapt_input_conv: Converted input conv {input_conv_name} weights from 3 to {in_chans} channel(s)") | |
# NOTE this strategy should be better than random init, but there could be other combinations of | |
# the original RGB input layer weights that'd work better for specific cases. | |
repeat = int(math.ceil(in_chans / 3)) | |
conv_weight = conv_weight.repeat(1, repeat, 1, 1)[:, :in_chans, :, :] | |
conv_weight *= 3 / float(in_chans) | |
conv_weight = conv_weight.to(conv_type) | |
return conv_weight | |
def load_pretrained( | |
model, | |
default_cfg=None, | |
num_classes=1000, | |
in_chans=3, | |
filter_fn=None, | |
strict=True, | |
progress=False, | |
): | |
"""Load pretrained checkpoint | |
Args: | |
model (nn.Module) : PyTorch model module | |
default_cfg (Optional[Dict]): default configuration for pretrained weights / target dataset | |
num_classes (int): num_classes for model | |
in_chans (int): in_chans for model | |
filter_fn (Optional[Callable]): state_dict filter fn for load (takes state_dict, model as args) | |
strict (bool): strict load of checkpoint | |
progress (bool): enable progress bar for weight download | |
""" | |
default_cfg = default_cfg or getattr(model, "default_cfg", None) or {} | |
pretrained_url = default_cfg.get("url", None) | |
if not pretrained_url: | |
_logger.warning( | |
"No pretrained weights exist for this model. Using random initialization." | |
) | |
return | |
_logger.info(f"Loading pretrained weights from url ({pretrained_url})") | |
pretrained_loc = download_cached_file( | |
pretrained_url, | |
check_hash=_CHECK_HASH, | |
progress=_DOWNLOAD_PROGRESS, | |
) | |
state_dict = torch.load(pretrained_loc, map_location="cpu") | |
if filter_fn is not None: | |
# for backwards compat with filter fn that take one arg, try one first, the two | |
try: | |
state_dict = filter_fn(state_dict) | |
except TypeError: | |
state_dict = filter_fn(state_dict, model) | |
input_convs = default_cfg.get("first_conv", None) | |
if input_convs is not None and in_chans != 3: | |
if isinstance(input_convs, str): | |
input_convs = (input_convs,) | |
for input_conv_name in input_convs: | |
weight_name = input_conv_name + ".weight" | |
try: | |
state_dict[weight_name] = adapt_input_conv( | |
in_chans, state_dict[weight_name], input_conv_name | |
) | |
# _logger.info( | |
# f"Converted input conv {input_conv_name} pretrained weights from 3 to {in_chans} channel(s)" | |
# ) | |
except (NotImplementedError, KeyError) as e: | |
if weight_name in state_dict: | |
del state_dict[weight_name] | |
strict = False | |
_logger.warning( | |
f"Unable to convert pretrained {input_conv_name} weights, using random init for this layer." | |
) | |
classifiers = default_cfg.get("classifier", None) | |
label_offset = default_cfg.get("label_offset", 0) | |
if classifiers is not None: | |
if isinstance(classifiers, str): | |
classifiers = (classifiers,) | |
if num_classes != default_cfg["num_classes"]: | |
for classifier_name in classifiers: | |
# completely discard fully connected if model num_classes doesn't match pretrained weights | |
del state_dict[classifier_name + ".weight"] | |
del state_dict[classifier_name + ".bias"] | |
strict = False | |
elif label_offset > 0: | |
for classifier_name in classifiers: | |
# special case for pretrained weights with an extra background class in pretrained weights | |
classifier_weight = state_dict[classifier_name + ".weight"] | |
state_dict[classifier_name + ".weight"] = classifier_weight[ | |
label_offset: | |
] | |
classifier_bias = state_dict[classifier_name + ".bias"] | |
state_dict[classifier_name + ".bias"] = classifier_bias[label_offset:] | |
model.load_state_dict(state_dict, strict=strict) | |
def overlay_external_default_cfg(default_cfg, kwargs): | |
"""Overlay 'external_default_cfg' in kwargs on top of default_cfg arg.""" | |
external_default_cfg = kwargs.pop("external_default_cfg", None) | |
if external_default_cfg: | |
default_cfg.pop("url", None) # url should come from external cfg | |
default_cfg.pop("hf_hub", None) # hf hub id should come from external cfg | |
default_cfg.update(external_default_cfg) | |
def filter_kwargs(kwargs, names): | |
if not kwargs or not names: | |
return | |
for n in names: | |
kwargs.pop(n, None) | |
def set_default_kwargs(kwargs, names, default_cfg): | |
for n in names: | |
# for legacy reasons, model __init__args uses img_size + in_chans as separate args while | |
# default_cfg has one input_size=(C, H ,W) entry | |
if n == "img_size": | |
input_size = default_cfg.get("input_size", None) | |
if input_size is not None: | |
assert len(input_size) == 3 | |
kwargs.setdefault(n, input_size[-2:]) | |
elif n == "in_chans": | |
input_size = default_cfg.get("input_size", None) | |
if input_size is not None: | |
assert len(input_size) == 3 | |
kwargs.setdefault(n, input_size[0]) | |
else: | |
default_val = default_cfg.get(n, None) | |
if default_val is not None: | |
kwargs.setdefault(n, default_cfg[n]) | |
def update_default_cfg_and_kwargs(default_cfg, kwargs, kwargs_filter): | |
"""Update the default_cfg and kwargs before passing to model | |
FIXME this sequence of overlay default_cfg, set default kwargs, filter kwargs | |
could/should be replaced by an improved configuration mechanism | |
Args: | |
default_cfg: input default_cfg (updated in-place) | |
kwargs: keyword args passed to model build fn (updated in-place) | |
kwargs_filter: keyword arg keys that must be removed before model __init__ | |
""" | |
# Overlay default cfg values from `external_default_cfg` if it exists in kwargs | |
overlay_external_default_cfg(default_cfg, kwargs) | |
# Set model __init__ args that can be determined by default_cfg (if not already passed as kwargs) | |
default_kwarg_names = ("num_classes", "global_pool", "in_chans") | |
if default_cfg.get("fixed_input_size", False): | |
# if fixed_input_size exists and is True, model takes an img_size arg that fixes its input size | |
default_kwarg_names += ("img_size",) | |
set_default_kwargs(kwargs, names=default_kwarg_names, default_cfg=default_cfg) | |
# Filter keyword args for task specific model variants (some 'features only' models, etc.) | |
filter_kwargs(kwargs, names=kwargs_filter) | |
def drop_path(x, drop_prob: float = 0.0, training: bool = False): | |
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). | |
This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, | |
the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... | |
See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for | |
changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use | |
'survival rate' as the argument. | |
""" | |
if drop_prob == 0.0 or not training: | |
return x | |
keep_prob = 1 - drop_prob | |
shape = (x.shape[0],) + (1,) * ( | |
x.ndim - 1 | |
) # work with diff dim tensors, not just 2D ConvNets | |
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device) | |
random_tensor.floor_() # binarize | |
output = x.div(keep_prob) * random_tensor | |
return output | |
class DropPath(nn.Module): | |
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).""" | |
def __init__(self, drop_prob=None): | |
super(DropPath, self).__init__() | |
self.drop_prob = drop_prob | |
def forward(self, x): | |
return drop_path(x, self.drop_prob, self.training) | |
from torch.nn.init import _calculate_fan_in_and_fan_out | |
def _no_grad_trunc_normal_(tensor, mean, std, a, b): | |
# Cut & paste from PyTorch official master until it's in a few official releases - RW | |
# Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf | |
def norm_cdf(x): | |
# Computes standard normal cumulative distribution function | |
return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0 | |
if (mean < a - 2 * std) or (mean > b + 2 * std): | |
warnings.warn( | |
"mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " | |
"The distribution of values may be incorrect.", | |
stacklevel=2, | |
) | |
with torch.no_grad(): | |
# Values are generated by using a truncated uniform distribution and | |
# then using the inverse CDF for the normal distribution. | |
# Get upper and lower cdf values | |
l = norm_cdf((a - mean) / std) | |
u = norm_cdf((b - mean) / std) | |
# Uniformly fill tensor with values from [l, u], then translate to | |
# [2l-1, 2u-1]. | |
tensor.uniform_(2 * l - 1, 2 * u - 1) | |
# Use inverse cdf transform for normal distribution to get truncated | |
# standard normal | |
tensor.erfinv_() | |
# Transform to proper mean, std | |
tensor.mul_(std * math.sqrt(2.0)) | |
tensor.add_(mean) | |
# Clamp to ensure it's in the proper range | |
tensor.clamp_(min=a, max=b) | |
return tensor | |
def trunc_normal_(tensor, mean=0.0, std=1.0, a=-2.0, b=2.0): | |
r"""Fills the input Tensor with values drawn from a truncated | |
normal distribution. The values are effectively drawn from the | |
normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)` | |
with values outside :math:`[a, b]` redrawn until they are within | |
the bounds. The method used for generating the random values works | |
best when :math:`a \leq \text{mean} \leq b`. | |
Args: | |
tensor: an n-dimensional `torch.Tensor` | |
mean: the mean of the normal distribution | |
std: the standard deviation of the normal distribution | |
a: the minimum cutoff value | |
b: the maximum cutoff value | |
Examples: | |
>>> w = torch.empty(3, 5) | |
>>> nn.init.trunc_normal_(w) | |
""" | |
return _no_grad_trunc_normal_(tensor, mean, std, a, b) | |
def variance_scaling_(tensor, scale=1.0, mode="fan_in", distribution="normal"): | |
fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor) | |
if mode == "fan_in": | |
denom = fan_in | |
elif mode == "fan_out": | |
denom = fan_out | |
elif mode == "fan_avg": | |
denom = (fan_in + fan_out) / 2 | |
variance = scale / denom | |
if distribution == "truncated_normal": | |
# constant is stddev of standard normal truncated to (-2, 2) | |
trunc_normal_(tensor, std=math.sqrt(variance) / 0.87962566103423978) | |
elif distribution == "normal": | |
tensor.normal_(std=math.sqrt(variance)) | |
elif distribution == "uniform": | |
bound = math.sqrt(3 * variance) | |
tensor.uniform_(-bound, bound) | |
else: | |
raise ValueError(f"invalid distribution {distribution}") | |
def lecun_normal_(tensor): | |
variance_scaling_(tensor, mode="fan_in", distribution="truncated_normal") | |
def build_model_with_cfg( | |
model_cls, | |
variant: str, | |
pretrained: bool, | |
default_cfg: dict, | |
model_cfg=None, | |
feature_cfg=None, | |
pretrained_strict: bool = True, | |
pretrained_filter_fn=None, | |
pretrained_custom_load=False, | |
kwargs_filter=None, | |
**kwargs, | |
): | |
"""Build model with specified default_cfg and optional model_cfg | |
This helper fn aids in the construction of a model including: | |
* handling default_cfg and associated pretained weight loading | |
* passing through optional model_cfg for models with config based arch spec | |
* features_only model adaptation | |
* pruning config / model adaptation | |
Args: | |
model_cls (nn.Module): model class | |
variant (str): model variant name | |
pretrained (bool): load pretrained weights | |
default_cfg (dict): model's default pretrained/task config | |
model_cfg (Optional[Dict]): model's architecture config | |
feature_cfg (Optional[Dict]: feature extraction adapter config | |
pretrained_strict (bool): load pretrained weights strictly | |
pretrained_filter_fn (Optional[Callable]): filter callable for pretrained weights | |
pretrained_custom_load (bool): use custom load fn, to load numpy or other non PyTorch weights | |
kwargs_filter (Optional[Tuple]): kwargs to filter before passing to model | |
**kwargs: model args passed through to model __init__ | |
""" | |
pruned = kwargs.pop("pruned", False) | |
features = False | |
feature_cfg = feature_cfg or {} | |
default_cfg = deepcopy(default_cfg) if default_cfg else {} | |
update_default_cfg_and_kwargs(default_cfg, kwargs, kwargs_filter) | |
default_cfg.setdefault("architecture", variant) | |
# Setup for feature extraction wrapper done at end of this fn | |
if kwargs.pop("features_only", False): | |
features = True | |
feature_cfg.setdefault("out_indices", (0, 1, 2, 3, 4)) | |
if "out_indices" in kwargs: | |
feature_cfg["out_indices"] = kwargs.pop("out_indices") | |
# Build the model | |
model = ( | |
model_cls(**kwargs) if model_cfg is None else model_cls(cfg=model_cfg, **kwargs) | |
) | |
model.default_cfg = default_cfg | |
# For classification models, check class attr, then kwargs, then default to 1k, otherwise 0 for feats | |
num_classes_pretrained = ( | |
0 | |
if features | |
else getattr(model, "num_classes", kwargs.get("num_classes", 1000)) | |
) | |
if pretrained: | |
assert not pretrained_custom_load, "URL should not contain npz for PASST models" | |
load_pretrained( | |
model, | |
num_classes=num_classes_pretrained, | |
in_chans=kwargs.get("in_chans", 3), | |
filter_fn=pretrained_filter_fn, | |
strict=pretrained_strict, | |
) | |
return model |