|
from functools import partial |
|
import torch |
|
import torch.nn as nn |
|
from transformers import PreTrainedModel |
|
from transformers.utils import cached_file |
|
from .smarties_config import SMARTIESConfig |
|
from functools import partial |
|
import numpy as np |
|
from timm.models.vision_transformer import Block |
|
import os |
|
import yaml |
|
|
|
class SpectrumRangeProjection(nn.Module): |
|
"""Patch Embedding of a sensor without patchify""" |
|
def __init__( |
|
self, |
|
spectral_range, |
|
spectrum_spec, |
|
patch_size, |
|
embed_dim, |
|
bias=True |
|
): |
|
super().__init__() |
|
self.spectral_range = spectral_range |
|
self.name = spectrum_spec['name'] |
|
self.min_wavelength = spectrum_spec['min_wavelength'] |
|
self.max_wavelength = spectrum_spec['max_wavelength'] |
|
self.sensors = spectrum_spec['sensors'] |
|
self.nb_pixels = patch_size**2 |
|
self.proj = nn.Linear(self.nb_pixels, embed_dim, bias=bias) |
|
|
|
def forward(self, x): |
|
return self.proj(x.view(-1, self.nb_pixels)) |
|
|
|
class SpectrumRangeProjectionAvg(nn.Module): |
|
"""Patch Embedding of a sensor without patchify""" |
|
def __init__( |
|
self, |
|
spectrum_projections, |
|
spectrum_spec, |
|
embed_dim |
|
): |
|
super().__init__() |
|
self.min_wavelength = spectrum_spec['min_wavelength'] |
|
self.max_wavelength = spectrum_spec['max_wavelength'] |
|
self.central_lambda = 0.5*(float(self.min_wavelength) + float(self.max_wavelength)) |
|
self.spectrum_projections = spectrum_projections |
|
self.weights = [] |
|
for spectrum_proj in self.spectrum_projections: |
|
central_lambda = 0.5*(float(spectrum_proj.min_wavelength) + float(spectrum_proj.max_wavelength)) |
|
self.weights.append(abs(self.central_lambda-central_lambda)) |
|
self.weights = np.array(self.weights) / sum(self.weights) |
|
self.embed_dim = embed_dim |
|
|
|
def forward(self, x): |
|
out = 0. |
|
for i, spectrum_proj in enumerate(self.spectrum_projections): |
|
out += spectrum_proj(x) * self.weights[i] |
|
return out |
|
|
|
|
|
class SpectrumAwareProjection(nn.Module): |
|
"""Patch Embedding of a sensor without patchify""" |
|
def __init__( |
|
self, |
|
spectrum_specs, |
|
patch_size, |
|
embed_dim, |
|
bias=True |
|
): |
|
super().__init__() |
|
self.nb_pixels = patch_size**2 |
|
|
|
self.spectrum_embeds = torch.nn.ModuleList() |
|
for spectral_range in sorted(spectrum_specs,key=lambda key:spectrum_specs[key]['projection_idx']): |
|
if ((spectrum_specs[spectral_range]['projection_idx'] != -1) and (len(spectrum_specs[spectral_range]['agg_projections']) == 0)) : |
|
self.spectrum_embeds.append(SpectrumRangeProjection( |
|
spectral_range, spectrum_specs[spectral_range], patch_size, embed_dim |
|
)) |
|
|
|
for spectral_range in sorted(spectrum_specs,key=lambda key:spectrum_specs[key]['projection_idx']): |
|
if ((spectrum_specs[spectral_range]['projection_idx'] != -1) and (len(spectrum_specs[spectral_range]['agg_projections']) > 0)): |
|
self.spectrum_embeds.append( |
|
SpectrumRangeProjectionAvg( |
|
[self.spectrum_embeds[agg_proj_idx] for agg_proj_idx in spectrum_specs[spectral_range]['agg_projections']], |
|
spectrum_specs[spectral_range], |
|
embed_dim)) |
|
|
|
def forward(self, x, projection_idx): |
|
return self.spectrum_embeds[projection_idx](x) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_2d_sincos_pos_embed(embed_dim, grid_size, cls_token=False): |
|
""" |
|
grid_size: int of the grid height and width |
|
return: |
|
pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token) |
|
""" |
|
grid_h = np.arange(grid_size, dtype=float) |
|
grid_w = np.arange(grid_size, dtype=float) |
|
grid = np.meshgrid(grid_w, grid_h) |
|
grid = np.stack(grid, axis=0) |
|
|
|
grid = grid.reshape([2, 1, grid_size, grid_size]) |
|
pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid) |
|
if cls_token: |
|
pos_embed = np.concatenate([np.zeros([1, embed_dim]), pos_embed], axis=0) |
|
return pos_embed |
|
|
|
|
|
def get_2d_sincos_pos_embed_from_grid(embed_dim, grid): |
|
assert embed_dim % 2 == 0 |
|
|
|
|
|
emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0]) |
|
emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1]) |
|
|
|
emb = np.concatenate([emb_h, emb_w], axis=1) |
|
return emb |
|
|
|
|
|
def get_1d_sincos_pos_embed_from_grid(embed_dim, pos): |
|
""" |
|
embed_dim: output dimension for each position |
|
pos: a list of positions to be encoded: size (M,) |
|
out: (M, D) |
|
""" |
|
assert embed_dim % 2 == 0 |
|
omega = np.arange(embed_dim // 2, dtype=float) |
|
omega /= embed_dim / 2.0 |
|
omega = 1.0 / 10000**omega |
|
|
|
pos = pos.reshape(-1) |
|
out = np.einsum("m,d->md", pos, omega) |
|
|
|
emb_sin = np.sin(out) |
|
emb_cos = np.cos(out) |
|
|
|
emb = np.concatenate([emb_sin, emb_cos], axis=1) |
|
return emb |
|
|
|
def get_dtype(mixed_precision): |
|
if mixed_precision == 'no': |
|
return torch.float32 |
|
elif mixed_precision == 'bf16': |
|
return torch.bfloat16 |
|
elif mixed_precision == 'fp16': |
|
return torch.float16 |
|
else: |
|
raise NotImplementedError |
|
|
|
class SMARTIESHF(PreTrainedModel): |
|
config_class = SMARTIESConfig |
|
def __init__(self, config: SMARTIESConfig): |
|
super().__init__(config) |
|
try: |
|
if config.spectrum_specs is None: |
|
spectrum_path = cached_file( |
|
config.name_or_path, |
|
"spectrum_specs.yaml" |
|
) |
|
with open(spectrum_path, "r") as f: |
|
config.spectrum_specs = yaml.safe_load(f) |
|
except Exception as e: |
|
raise RuntimeError( |
|
"spectrum_specs couldn't be loaded from spectrum_specs.yaml. " \ |
|
"Please load yaml file yourself and provide the argument spectrum_specs with the loaded file." |
|
) from e |
|
self.model_dtype = get_dtype(config.mixed_precision) |
|
self.embed_dim = config.embed_dim |
|
self.decoder_embed_dim = config.decoder_embed_dim |
|
self.projection_conversion = {i: config.spectrum_specs[i]['projection_idx'] for i in config.spectrum_specs} |
|
self.sensor_band_specs = { |
|
'S2': [ |
|
'aerosol', |
|
'blue_1', |
|
'green_2', |
|
'red_2', |
|
'red_edge_1', |
|
'red_edge_2', |
|
'near_infrared_2', |
|
'near_infrared_1', |
|
'near_infrared_3', |
|
'short_wave_infrared_1', |
|
'short_wave_infrared_3', |
|
'short_wave_infrared_4' |
|
], |
|
'S1': [ |
|
'microwave_1', |
|
'microwave_2' |
|
], |
|
'RGB': [ |
|
'red_1', |
|
'green_1', |
|
'blue_3' |
|
] |
|
} |
|
self.sensor_projection_specs = {} |
|
for sensor_name in self.sensor_band_specs: |
|
self.sensor_projection_specs[sensor_name] = np.array( |
|
[self.projection_conversion[i] for i in self.sensor_band_specs[sensor_name]]) |
|
|
|
self.patch_size = config.patch_size |
|
self.pos_drop = nn.Dropout(p=config.pos_drop_rate) |
|
self.nb_patch_length = int(config.img_size / self.patch_size) |
|
self.num_patches = self.nb_patch_length**2 |
|
|
|
self.cls_token = nn.Parameter(torch.zeros(1, 1, self.embed_dim)) |
|
self.pos_embed = nn.Parameter(torch.zeros(1, self.num_patches + 1, self.embed_dim), requires_grad=False) |
|
|
|
self.spectrum_projection = SpectrumAwareProjection( |
|
spectrum_specs=config.spectrum_specs, |
|
patch_size=self.patch_size, |
|
embed_dim=self.embed_dim |
|
) |
|
|
|
pos_embed = get_2d_sincos_pos_embed( |
|
self.pos_embed.shape[-1], |
|
self.nb_patch_length, |
|
cls_token=True, |
|
) |
|
self.pos_embed.data.copy_(torch.from_numpy(pos_embed).float().unsqueeze(0)) |
|
self.projection_scaler = 12 |
|
self.norm_layer = partial(nn.LayerNorm, eps=config.norm_layer_eps) |
|
|
|
self.blocks = nn.ModuleList([ |
|
Block(self.embed_dim, config.num_heads, config.mlp_ratio, qkv_bias=config.qkv_bias, norm_layer=self.norm_layer) |
|
for i in range(config.depth)]) |
|
self.norm = self.norm_layer(self.embed_dim) |
|
self.global_pool = config.global_pool |
|
if self.global_pool: |
|
self.fc_norm = self.norm_layer(self.embed_dim) |
|
|
|
|
|
self.decoder_embed = nn.Linear(self.embed_dim, self.decoder_embed_dim, bias=True) |
|
|
|
self.mask_token = nn.Parameter(torch.zeros(1, 1, self.decoder_embed_dim)) |
|
|
|
self.decoder_pos_embed = nn.Parameter(torch.zeros(1, self.num_patches + 1, self.decoder_embed_dim), requires_grad=False) |
|
self.projection_scaler = 12 |
|
|
|
self.decoder_blocks = nn.ModuleList([ |
|
Block(self.decoder_embed_dim, config.decoder_num_heads, config.mlp_ratio, qkv_bias=True, norm_layer=self.norm_layer) |
|
for i in range(config.decoder_depth)]) |
|
|
|
self.decoder_norm = self.norm_layer(self.decoder_embed_dim) |
|
self.decoder_preds = torch.nn.ModuleList() |
|
for band_idx in sorted(config.spectrum_specs, key=lambda key: config.spectrum_specs[key]['projection_idx']): |
|
if ((config.spectrum_specs[band_idx]['projection_idx'] != -1) and (len(config.spectrum_specs[band_idx]['agg_projections']) == 0)): |
|
self.decoder_preds.append(nn.Linear(self.decoder_embed_dim, self.patch_size**2, bias=True)) |
|
|
|
def tensor_patchify(self, imgs): |
|
""" |
|
imgs: (N, nb_bands, H, W) |
|
x: (N, L, patch_size**2 *nb_bands) |
|
""" |
|
p = self.patch_size |
|
assert imgs.shape[2] == imgs.shape[3] and imgs.shape[2] % p == 0 |
|
|
|
h = w = imgs.shape[2] // p |
|
x = imgs.reshape(shape=(imgs.shape[0], imgs.shape[1], h, p, w, p)) |
|
x = torch.einsum('nchpwq->nhwpqc', x) |
|
x = x.reshape(shape=(imgs.shape[0], h, w, p, p, imgs.shape[1])).permute(0,1,2,5,3,4) |
|
return x |
|
|
|
def forward_encoder(self, imgs, proj_indices, is_patchify, all_tokens): |
|
if is_patchify: |
|
img_patches = self.tensor_patchify(imgs) |
|
else: |
|
img_patches = imgs |
|
B, nb_patch_h, nb_patch_w, nb_bands, _, _ = img_patches.shape |
|
device = img_patches.device |
|
|
|
img_spectrum_embeds = torch.zeros((B, nb_patch_h, nb_patch_w, nb_bands, self.embed_dim), device=device, dtype=self.model_dtype) |
|
|
|
for projection_idx in torch.unbind(torch.unique(proj_indices)): |
|
mask = (proj_indices==projection_idx) |
|
img_spectrum_embeds[mask] = self.spectrum_projection(img_patches[mask], projection_idx) |
|
|
|
img_embeddings = self.projection_scaler*img_spectrum_embeds.mean(dim=3) |
|
img_embeddings = img_embeddings.reshape(-1,nb_patch_h*nb_patch_w,self.embed_dim) |
|
|
|
cls_tokens = self.cls_token.expand( |
|
B, -1, -1 |
|
) |
|
x = torch.cat((cls_tokens, img_embeddings), dim=1) |
|
x = x + self.pos_embed |
|
x = self.pos_drop(x) |
|
|
|
for blk in self.blocks: |
|
x = blk(x) |
|
|
|
if all_tokens: |
|
return self.norm(x) |
|
|
|
if self.global_pool: |
|
x = x[:, 1:, :].mean(dim=1) |
|
outcome = self.fc_norm(x) |
|
else: |
|
x = self.norm(x) |
|
outcome = x[:, 0] |
|
|
|
return outcome |
|
|
|
def forward(self, imgs, is_patchify=True, sensor_type='S2', bands=None, proj_indices=None, all_tokens=False): |
|
if proj_indices is None: |
|
if bands is None: |
|
assert sensor_type in self.sensor_band_specs.keys(), f"Sensor type {sensor_type} not recognized. Available types: {list(self.sensor_band_specs.keys())}. Otherwise provide bands." |
|
proj_indices = self.sensor_projection_specs[sensor_type] |
|
else: |
|
proj_indices = [] |
|
for i in bands: |
|
if i in self.projection_conversion.keys(): |
|
proj_indices.append(self.projection_conversion[i]) |
|
assert len(proj_indices) > 0, \ |
|
"No valid bands provided. Please check the bands to be aligned with the spectrum_specs definition \ |
|
(default version can be accessed at https://github.com/gsumbul/SMARTIES/blob/main/config/electromagnetic_spectrum.yaml)." |
|
proj_indices = np.array(proj_indices) |
|
proj_indices = torch.as_tensor(np.tile(proj_indices.reshape( |
|
1,1,1,-1), (imgs.shape[0], self.nb_patch_length, self.nb_patch_length, 1)).astype(np.int32), device=imgs.device) |
|
|
|
return self.forward_encoder(imgs, proj_indices, is_patchify=is_patchify, all_tokens=all_tokens) |
|
|