sohamc10's picture
gradio app
9b0d6c2
"""
Adapted from https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py
Credit to @leo19941227 for remove timm dependencies here : https://github.com/s3prl/passt_hear21/blob/48a0dc1b824641ca59884ced53f5b86053fed141/hear21passt/models/helpers/vit_helpers.py
"""
import math
import logging
import warnings
from copy import deepcopy
import torch
from torch import nn
from timm.models._hub import download_cached_file
# Global variables for rarely used pretrained checkpoint download progress and hash check.
# Use set_pretrained_download_progress / set_pretrained_check_hash functions to toggle.
_DOWNLOAD_PROGRESS = True
_CHECK_HASH = False
_logger = logging.getLogger(__name__)
def adapt_input_conv(in_chans, conv_weight, input_conv_name="(name not given)"):
conv_type = conv_weight.dtype
conv_weight = (
conv_weight.float()
) # Some weights are in torch.half, ensure it's float for sum on CPU
O, I, J, K = conv_weight.shape
if in_chans == 1:
print(f"adapt_input_conv: Converted from {I} to 1 channel")
if I > 3:
assert conv_weight.shape[1] % 3 == 0
# For models with space2depth stems
conv_weight = conv_weight.reshape(O, I // 3, 3, J, K)
conv_weight = conv_weight.sum(dim=2, keepdim=False)
else:
conv_weight = conv_weight.sum(dim=1, keepdim=True)
elif in_chans != 3:
if I != 3:
# loading a model pretrained on AudioSet for the downstream-task
if I == in_chans:
print(f"adapt_input_conv: Loading pretrained weights for {input_conv_name}, "
f"Assuming same input-conv and proj-conv configuration (1:1).")
pass
else:
print(f"adapt_input_conv: Converted input conv {input_conv_name} weights from 3 to {in_chans} channel(s)")
# NOTE this strategy should be better than random init, but there could be other combinations of
# the original RGB input layer weights that'd work better for specific cases.
repeat = int(math.ceil(in_chans / 3))
conv_weight = conv_weight.repeat(1, repeat, 1, 1)[:, :in_chans, :, :]
conv_weight *= 3 / float(in_chans)
conv_weight = conv_weight.to(conv_type)
return conv_weight
def load_pretrained(
model,
default_cfg=None,
num_classes=1000,
in_chans=3,
filter_fn=None,
strict=True,
progress=False,
):
"""Load pretrained checkpoint
Args:
model (nn.Module) : PyTorch model module
default_cfg (Optional[Dict]): default configuration for pretrained weights / target dataset
num_classes (int): num_classes for model
in_chans (int): in_chans for model
filter_fn (Optional[Callable]): state_dict filter fn for load (takes state_dict, model as args)
strict (bool): strict load of checkpoint
progress (bool): enable progress bar for weight download
"""
default_cfg = default_cfg or getattr(model, "default_cfg", None) or {}
pretrained_url = default_cfg.get("url", None)
if not pretrained_url:
_logger.warning(
"No pretrained weights exist for this model. Using random initialization."
)
return
_logger.info(f"Loading pretrained weights from url ({pretrained_url})")
pretrained_loc = download_cached_file(
pretrained_url,
check_hash=_CHECK_HASH,
progress=_DOWNLOAD_PROGRESS,
)
state_dict = torch.load(pretrained_loc, map_location="cpu")
if filter_fn is not None:
# for backwards compat with filter fn that take one arg, try one first, the two
try:
state_dict = filter_fn(state_dict)
except TypeError:
state_dict = filter_fn(state_dict, model)
input_convs = default_cfg.get("first_conv", None)
if input_convs is not None and in_chans != 3:
if isinstance(input_convs, str):
input_convs = (input_convs,)
for input_conv_name in input_convs:
weight_name = input_conv_name + ".weight"
try:
state_dict[weight_name] = adapt_input_conv(
in_chans, state_dict[weight_name], input_conv_name
)
# _logger.info(
# f"Converted input conv {input_conv_name} pretrained weights from 3 to {in_chans} channel(s)"
# )
except (NotImplementedError, KeyError) as e:
if weight_name in state_dict:
del state_dict[weight_name]
strict = False
_logger.warning(
f"Unable to convert pretrained {input_conv_name} weights, using random init for this layer."
)
classifiers = default_cfg.get("classifier", None)
label_offset = default_cfg.get("label_offset", 0)
if classifiers is not None:
if isinstance(classifiers, str):
classifiers = (classifiers,)
if num_classes != default_cfg["num_classes"]:
for classifier_name in classifiers:
# completely discard fully connected if model num_classes doesn't match pretrained weights
del state_dict[classifier_name + ".weight"]
del state_dict[classifier_name + ".bias"]
strict = False
elif label_offset > 0:
for classifier_name in classifiers:
# special case for pretrained weights with an extra background class in pretrained weights
classifier_weight = state_dict[classifier_name + ".weight"]
state_dict[classifier_name + ".weight"] = classifier_weight[
label_offset:
]
classifier_bias = state_dict[classifier_name + ".bias"]
state_dict[classifier_name + ".bias"] = classifier_bias[label_offset:]
model.load_state_dict(state_dict, strict=strict)
def overlay_external_default_cfg(default_cfg, kwargs):
"""Overlay 'external_default_cfg' in kwargs on top of default_cfg arg."""
external_default_cfg = kwargs.pop("external_default_cfg", None)
if external_default_cfg:
default_cfg.pop("url", None) # url should come from external cfg
default_cfg.pop("hf_hub", None) # hf hub id should come from external cfg
default_cfg.update(external_default_cfg)
def filter_kwargs(kwargs, names):
if not kwargs or not names:
return
for n in names:
kwargs.pop(n, None)
def set_default_kwargs(kwargs, names, default_cfg):
for n in names:
# for legacy reasons, model __init__args uses img_size + in_chans as separate args while
# default_cfg has one input_size=(C, H ,W) entry
if n == "img_size":
input_size = default_cfg.get("input_size", None)
if input_size is not None:
assert len(input_size) == 3
kwargs.setdefault(n, input_size[-2:])
elif n == "in_chans":
input_size = default_cfg.get("input_size", None)
if input_size is not None:
assert len(input_size) == 3
kwargs.setdefault(n, input_size[0])
else:
default_val = default_cfg.get(n, None)
if default_val is not None:
kwargs.setdefault(n, default_cfg[n])
def update_default_cfg_and_kwargs(default_cfg, kwargs, kwargs_filter):
"""Update the default_cfg and kwargs before passing to model
FIXME this sequence of overlay default_cfg, set default kwargs, filter kwargs
could/should be replaced by an improved configuration mechanism
Args:
default_cfg: input default_cfg (updated in-place)
kwargs: keyword args passed to model build fn (updated in-place)
kwargs_filter: keyword arg keys that must be removed before model __init__
"""
# Overlay default cfg values from `external_default_cfg` if it exists in kwargs
overlay_external_default_cfg(default_cfg, kwargs)
# Set model __init__ args that can be determined by default_cfg (if not already passed as kwargs)
default_kwarg_names = ("num_classes", "global_pool", "in_chans")
if default_cfg.get("fixed_input_size", False):
# if fixed_input_size exists and is True, model takes an img_size arg that fixes its input size
default_kwarg_names += ("img_size",)
set_default_kwargs(kwargs, names=default_kwarg_names, default_cfg=default_cfg)
# Filter keyword args for task specific model variants (some 'features only' models, etc.)
filter_kwargs(kwargs, names=kwargs_filter)
def drop_path(x, drop_prob: float = 0.0, training: bool = False):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
This is the same as the DropConnect impl I created for EfficientNet, etc networks, however,
the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper...
See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for
changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use
'survival rate' as the argument.
"""
if drop_prob == 0.0 or not training:
return x
keep_prob = 1 - drop_prob
shape = (x.shape[0],) + (1,) * (
x.ndim - 1
) # work with diff dim tensors, not just 2D ConvNets
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
random_tensor.floor_() # binarize
output = x.div(keep_prob) * random_tensor
return output
class DropPath(nn.Module):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks)."""
def __init__(self, drop_prob=None):
super(DropPath, self).__init__()
self.drop_prob = drop_prob
def forward(self, x):
return drop_path(x, self.drop_prob, self.training)
from torch.nn.init import _calculate_fan_in_and_fan_out
def _no_grad_trunc_normal_(tensor, mean, std, a, b):
# Cut & paste from PyTorch official master until it's in a few official releases - RW
# Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf
def norm_cdf(x):
# Computes standard normal cumulative distribution function
return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0
if (mean < a - 2 * std) or (mean > b + 2 * std):
warnings.warn(
"mean is more than 2 std from [a, b] in nn.init.trunc_normal_. "
"The distribution of values may be incorrect.",
stacklevel=2,
)
with torch.no_grad():
# Values are generated by using a truncated uniform distribution and
# then using the inverse CDF for the normal distribution.
# Get upper and lower cdf values
l = norm_cdf((a - mean) / std)
u = norm_cdf((b - mean) / std)
# Uniformly fill tensor with values from [l, u], then translate to
# [2l-1, 2u-1].
tensor.uniform_(2 * l - 1, 2 * u - 1)
# Use inverse cdf transform for normal distribution to get truncated
# standard normal
tensor.erfinv_()
# Transform to proper mean, std
tensor.mul_(std * math.sqrt(2.0))
tensor.add_(mean)
# Clamp to ensure it's in the proper range
tensor.clamp_(min=a, max=b)
return tensor
def trunc_normal_(tensor, mean=0.0, std=1.0, a=-2.0, b=2.0):
r"""Fills the input Tensor with values drawn from a truncated
normal distribution. The values are effectively drawn from the
normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)`
with values outside :math:`[a, b]` redrawn until they are within
the bounds. The method used for generating the random values works
best when :math:`a \leq \text{mean} \leq b`.
Args:
tensor: an n-dimensional `torch.Tensor`
mean: the mean of the normal distribution
std: the standard deviation of the normal distribution
a: the minimum cutoff value
b: the maximum cutoff value
Examples:
>>> w = torch.empty(3, 5)
>>> nn.init.trunc_normal_(w)
"""
return _no_grad_trunc_normal_(tensor, mean, std, a, b)
def variance_scaling_(tensor, scale=1.0, mode="fan_in", distribution="normal"):
fan_in, fan_out = _calculate_fan_in_and_fan_out(tensor)
if mode == "fan_in":
denom = fan_in
elif mode == "fan_out":
denom = fan_out
elif mode == "fan_avg":
denom = (fan_in + fan_out) / 2
variance = scale / denom
if distribution == "truncated_normal":
# constant is stddev of standard normal truncated to (-2, 2)
trunc_normal_(tensor, std=math.sqrt(variance) / 0.87962566103423978)
elif distribution == "normal":
tensor.normal_(std=math.sqrt(variance))
elif distribution == "uniform":
bound = math.sqrt(3 * variance)
tensor.uniform_(-bound, bound)
else:
raise ValueError(f"invalid distribution {distribution}")
def lecun_normal_(tensor):
variance_scaling_(tensor, mode="fan_in", distribution="truncated_normal")
def build_model_with_cfg(
model_cls,
variant: str,
pretrained: bool,
default_cfg: dict,
model_cfg=None,
feature_cfg=None,
pretrained_strict: bool = True,
pretrained_filter_fn=None,
pretrained_custom_load=False,
kwargs_filter=None,
**kwargs,
):
"""Build model with specified default_cfg and optional model_cfg
This helper fn aids in the construction of a model including:
* handling default_cfg and associated pretained weight loading
* passing through optional model_cfg for models with config based arch spec
* features_only model adaptation
* pruning config / model adaptation
Args:
model_cls (nn.Module): model class
variant (str): model variant name
pretrained (bool): load pretrained weights
default_cfg (dict): model's default pretrained/task config
model_cfg (Optional[Dict]): model's architecture config
feature_cfg (Optional[Dict]: feature extraction adapter config
pretrained_strict (bool): load pretrained weights strictly
pretrained_filter_fn (Optional[Callable]): filter callable for pretrained weights
pretrained_custom_load (bool): use custom load fn, to load numpy or other non PyTorch weights
kwargs_filter (Optional[Tuple]): kwargs to filter before passing to model
**kwargs: model args passed through to model __init__
"""
pruned = kwargs.pop("pruned", False)
features = False
feature_cfg = feature_cfg or {}
default_cfg = deepcopy(default_cfg) if default_cfg else {}
update_default_cfg_and_kwargs(default_cfg, kwargs, kwargs_filter)
default_cfg.setdefault("architecture", variant)
# Setup for feature extraction wrapper done at end of this fn
if kwargs.pop("features_only", False):
features = True
feature_cfg.setdefault("out_indices", (0, 1, 2, 3, 4))
if "out_indices" in kwargs:
feature_cfg["out_indices"] = kwargs.pop("out_indices")
# Build the model
model = (
model_cls(**kwargs) if model_cfg is None else model_cls(cfg=model_cfg, **kwargs)
)
model.default_cfg = default_cfg
# For classification models, check class attr, then kwargs, then default to 1k, otherwise 0 for feats
num_classes_pretrained = (
0
if features
else getattr(model, "num_classes", kwargs.get("num_classes", 1000))
)
if pretrained:
assert not pretrained_custom_load, "URL should not contain npz for PASST models"
load_pretrained(
model,
num_classes=num_classes_pretrained,
in_chans=kwargs.get("in_chans", 3),
filter_fn=pretrained_filter_fn,
strict=pretrained_strict,
)
return model