SMARTIES-v1-ViT-L / smarties_model.py
gsumbul's picture
Hugging Face transformers model files
427561b verified
from functools import partial
import torch
import torch.nn as nn
from transformers import PreTrainedModel
from transformers.utils import cached_file
from .smarties_config import SMARTIESConfig
from functools import partial
import numpy as np
from timm.models.vision_transformer import Block
import os
import yaml
class SpectrumRangeProjection(nn.Module):
"""Patch Embedding of a sensor without patchify"""
def __init__(
self,
spectral_range,
spectrum_spec,
patch_size,
embed_dim,
bias=True
):
super().__init__()
self.spectral_range = spectral_range
self.name = spectrum_spec['name']
self.min_wavelength = spectrum_spec['min_wavelength']
self.max_wavelength = spectrum_spec['max_wavelength']
self.sensors = spectrum_spec['sensors']
self.nb_pixels = patch_size**2
self.proj = nn.Linear(self.nb_pixels, embed_dim, bias=bias)
def forward(self, x):
return self.proj(x.view(-1, self.nb_pixels))
class SpectrumRangeProjectionAvg(nn.Module):
"""Patch Embedding of a sensor without patchify"""
def __init__(
self,
spectrum_projections,
spectrum_spec,
embed_dim
):
super().__init__()
self.min_wavelength = spectrum_spec['min_wavelength']
self.max_wavelength = spectrum_spec['max_wavelength']
self.central_lambda = 0.5*(float(self.min_wavelength) + float(self.max_wavelength))
self.spectrum_projections = spectrum_projections
self.weights = []
for spectrum_proj in self.spectrum_projections:
central_lambda = 0.5*(float(spectrum_proj.min_wavelength) + float(spectrum_proj.max_wavelength))
self.weights.append(abs(self.central_lambda-central_lambda))
self.weights = np.array(self.weights) / sum(self.weights)
self.embed_dim = embed_dim
def forward(self, x):
out = 0. #torch.zeros((len(x),self.embed_dim))
for i, spectrum_proj in enumerate(self.spectrum_projections):
out += spectrum_proj(x) * self.weights[i]
return out
class SpectrumAwareProjection(nn.Module):
"""Patch Embedding of a sensor without patchify"""
def __init__(
self,
spectrum_specs,
patch_size,
embed_dim,
bias=True
):
super().__init__()
self.nb_pixels = patch_size**2
self.spectrum_embeds = torch.nn.ModuleList()
for spectral_range in sorted(spectrum_specs,key=lambda key:spectrum_specs[key]['projection_idx']):
if ((spectrum_specs[spectral_range]['projection_idx'] != -1) and (len(spectrum_specs[spectral_range]['agg_projections']) == 0)) :
self.spectrum_embeds.append(SpectrumRangeProjection(
spectral_range, spectrum_specs[spectral_range], patch_size, embed_dim
))
for spectral_range in sorted(spectrum_specs,key=lambda key:spectrum_specs[key]['projection_idx']):
if ((spectrum_specs[spectral_range]['projection_idx'] != -1) and (len(spectrum_specs[spectral_range]['agg_projections']) > 0)):
self.spectrum_embeds.append(
SpectrumRangeProjectionAvg(
[self.spectrum_embeds[agg_proj_idx] for agg_proj_idx in spectrum_specs[spectral_range]['agg_projections']],
spectrum_specs[spectral_range],
embed_dim))
def forward(self, x, projection_idx):
return self.spectrum_embeds[projection_idx](x)
# --------------------------------------------------------
# 2D sine-cosine position embedding
# References:
# Transformer: https://github.com/tensorflow/models/blob/master/official/nlp/transformer/model_utils.py
# MoCo v3: https://github.com/facebookresearch/moco-v3
# --------------------------------------------------------
def get_2d_sincos_pos_embed(embed_dim, grid_size, cls_token=False):
"""
grid_size: int of the grid height and width
return:
pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token)
"""
grid_h = np.arange(grid_size, dtype=float)
grid_w = np.arange(grid_size, dtype=float)
grid = np.meshgrid(grid_w, grid_h) # here w goes first
grid = np.stack(grid, axis=0)
grid = grid.reshape([2, 1, grid_size, grid_size])
pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid)
if cls_token:
pos_embed = np.concatenate([np.zeros([1, embed_dim]), pos_embed], axis=0)
return pos_embed
def get_2d_sincos_pos_embed_from_grid(embed_dim, grid):
assert embed_dim % 2 == 0
# use half of dimensions to encode grid_h
emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0]) # (H*W, D/2)
emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1]) # (H*W, D/2)
emb = np.concatenate([emb_h, emb_w], axis=1) # (H*W, D)
return emb
def get_1d_sincos_pos_embed_from_grid(embed_dim, pos):
"""
embed_dim: output dimension for each position
pos: a list of positions to be encoded: size (M,)
out: (M, D)
"""
assert embed_dim % 2 == 0
omega = np.arange(embed_dim // 2, dtype=float)
omega /= embed_dim / 2.0
omega = 1.0 / 10000**omega # (D/2,)
pos = pos.reshape(-1) # (M,)
out = np.einsum("m,d->md", pos, omega) # (M, D/2), outer product
emb_sin = np.sin(out) # (M, D/2)
emb_cos = np.cos(out) # (M, D/2)
emb = np.concatenate([emb_sin, emb_cos], axis=1) # (M, D)
return emb
def get_dtype(mixed_precision):
if mixed_precision == 'no':
return torch.float32
elif mixed_precision == 'bf16':
return torch.bfloat16
elif mixed_precision == 'fp16':
return torch.float16
else:
raise NotImplementedError
class SMARTIESHF(PreTrainedModel):
config_class = SMARTIESConfig
def __init__(self, config: SMARTIESConfig):
super().__init__(config)
try:
if config.spectrum_specs is None:
spectrum_path = cached_file(
config.name_or_path,
"spectrum_specs.yaml"
)
with open(spectrum_path, "r") as f:
config.spectrum_specs = yaml.safe_load(f)
except Exception as e:
raise RuntimeError(
"spectrum_specs couldn't be loaded from spectrum_specs.yaml. " \
"Please load yaml file yourself and provide the argument spectrum_specs with the loaded file."
) from e
self.model_dtype = get_dtype(config.mixed_precision)
self.embed_dim = config.embed_dim
self.decoder_embed_dim = config.decoder_embed_dim
self.projection_conversion = {i: config.spectrum_specs[i]['projection_idx'] for i in config.spectrum_specs}
self.sensor_band_specs = {
'S2': [
'aerosol',
'blue_1',
'green_2',
'red_2',
'red_edge_1',
'red_edge_2',
'near_infrared_2',
'near_infrared_1',
'near_infrared_3',
'short_wave_infrared_1',
'short_wave_infrared_3',
'short_wave_infrared_4'
],
'S1': [
'microwave_1',
'microwave_2'
],
'RGB': [
'red_1',
'green_1',
'blue_3'
]
}
self.sensor_projection_specs = {}
for sensor_name in self.sensor_band_specs:
self.sensor_projection_specs[sensor_name] = np.array(
[self.projection_conversion[i] for i in self.sensor_band_specs[sensor_name]])
self.patch_size = config.patch_size
self.pos_drop = nn.Dropout(p=config.pos_drop_rate)
self.nb_patch_length = int(config.img_size / self.patch_size)
self.num_patches = self.nb_patch_length**2
self.cls_token = nn.Parameter(torch.zeros(1, 1, self.embed_dim))
self.pos_embed = nn.Parameter(torch.zeros(1, self.num_patches + 1, self.embed_dim), requires_grad=False) # fixed sin-cos embedding
self.spectrum_projection = SpectrumAwareProjection(
spectrum_specs=config.spectrum_specs,
patch_size=self.patch_size,
embed_dim=self.embed_dim
)
pos_embed = get_2d_sincos_pos_embed(
self.pos_embed.shape[-1],
self.nb_patch_length,
cls_token=True,
)
self.pos_embed.data.copy_(torch.from_numpy(pos_embed).float().unsqueeze(0))
self.projection_scaler = 12
self.norm_layer = partial(nn.LayerNorm, eps=config.norm_layer_eps)
self.blocks = nn.ModuleList([
Block(self.embed_dim, config.num_heads, config.mlp_ratio, qkv_bias=config.qkv_bias, norm_layer=self.norm_layer)
for i in range(config.depth)])
self.norm = self.norm_layer(self.embed_dim)
self.global_pool = config.global_pool
if self.global_pool:
self.fc_norm = self.norm_layer(self.embed_dim)
# decoder specifics
self.decoder_embed = nn.Linear(self.embed_dim, self.decoder_embed_dim, bias=True)
self.mask_token = nn.Parameter(torch.zeros(1, 1, self.decoder_embed_dim))
self.decoder_pos_embed = nn.Parameter(torch.zeros(1, self.num_patches + 1, self.decoder_embed_dim), requires_grad=False) # fixed sin-cos embedding
self.projection_scaler = 12
self.decoder_blocks = nn.ModuleList([
Block(self.decoder_embed_dim, config.decoder_num_heads, config.mlp_ratio, qkv_bias=True, norm_layer=self.norm_layer)
for i in range(config.decoder_depth)])
self.decoder_norm = self.norm_layer(self.decoder_embed_dim)
self.decoder_preds = torch.nn.ModuleList()
for band_idx in sorted(config.spectrum_specs, key=lambda key: config.spectrum_specs[key]['projection_idx']):
if ((config.spectrum_specs[band_idx]['projection_idx'] != -1) and (len(config.spectrum_specs[band_idx]['agg_projections']) == 0)):
self.decoder_preds.append(nn.Linear(self.decoder_embed_dim, self.patch_size**2, bias=True))
def tensor_patchify(self, imgs):
"""
imgs: (N, nb_bands, H, W)
x: (N, L, patch_size**2 *nb_bands)
"""
p = self.patch_size
assert imgs.shape[2] == imgs.shape[3] and imgs.shape[2] % p == 0
h = w = imgs.shape[2] // p
x = imgs.reshape(shape=(imgs.shape[0], imgs.shape[1], h, p, w, p))
x = torch.einsum('nchpwq->nhwpqc', x)
x = x.reshape(shape=(imgs.shape[0], h, w, p, p, imgs.shape[1])).permute(0,1,2,5,3,4)
return x
def forward_encoder(self, imgs, proj_indices, is_patchify, all_tokens):
if is_patchify:
img_patches = self.tensor_patchify(imgs)
else:
img_patches = imgs
B, nb_patch_h, nb_patch_w, nb_bands, _, _ = img_patches.shape
device = img_patches.device
img_spectrum_embeds = torch.zeros((B, nb_patch_h, nb_patch_w, nb_bands, self.embed_dim), device=device, dtype=self.model_dtype)
for projection_idx in torch.unbind(torch.unique(proj_indices)):
mask = (proj_indices==projection_idx)
img_spectrum_embeds[mask] = self.spectrum_projection(img_patches[mask], projection_idx)
img_embeddings = self.projection_scaler*img_spectrum_embeds.mean(dim=3)
img_embeddings = img_embeddings.reshape(-1,nb_patch_h*nb_patch_w,self.embed_dim)
cls_tokens = self.cls_token.expand(
B, -1, -1
)
x = torch.cat((cls_tokens, img_embeddings), dim=1)
x = x + self.pos_embed
x = self.pos_drop(x)
for blk in self.blocks:
x = blk(x)
if all_tokens:
return self.norm(x) # B, L, embed_dim (L=1+patch_size**2)
if self.global_pool:
x = x[:, 1:, :].mean(dim=1)
outcome = self.fc_norm(x)
else:
x = self.norm(x)
outcome = x[:, 0]
return outcome
def forward(self, imgs, is_patchify=True, sensor_type='S2', bands=None, proj_indices=None, all_tokens=False):
if proj_indices is None:
if bands is None:
assert sensor_type in self.sensor_band_specs.keys(), f"Sensor type {sensor_type} not recognized. Available types: {list(self.sensor_band_specs.keys())}. Otherwise provide bands."
proj_indices = self.sensor_projection_specs[sensor_type]
else:
proj_indices = []
for i in bands:
if i in self.projection_conversion.keys():
proj_indices.append(self.projection_conversion[i])
assert len(proj_indices) > 0, \
"No valid bands provided. Please check the bands to be aligned with the spectrum_specs definition \
(default version can be accessed at https://github.com/gsumbul/SMARTIES/blob/main/config/electromagnetic_spectrum.yaml)."
proj_indices = np.array(proj_indices)
proj_indices = torch.as_tensor(np.tile(proj_indices.reshape(
1,1,1,-1), (imgs.shape[0], self.nb_patch_length, self.nb_patch_length, 1)).astype(np.int32), device=imgs.device)
return self.forward_encoder(imgs, proj_indices, is_patchify=is_patchify, all_tokens=all_tokens)