from functools import partial import torch import torch.nn as nn from transformers import PreTrainedModel from transformers.utils import cached_file from .smarties_config import SMARTIESConfig from functools import partial import numpy as np from timm.models.vision_transformer import Block import os import yaml class SpectrumRangeProjection(nn.Module): """Patch Embedding of a sensor without patchify""" def __init__( self, spectral_range, spectrum_spec, patch_size, embed_dim, bias=True ): super().__init__() self.spectral_range = spectral_range self.name = spectrum_spec['name'] self.min_wavelength = spectrum_spec['min_wavelength'] self.max_wavelength = spectrum_spec['max_wavelength'] self.sensors = spectrum_spec['sensors'] self.nb_pixels = patch_size**2 self.proj = nn.Linear(self.nb_pixels, embed_dim, bias=bias) def forward(self, x): return self.proj(x.view(-1, self.nb_pixels)) class SpectrumRangeProjectionAvg(nn.Module): """Patch Embedding of a sensor without patchify""" def __init__( self, spectrum_projections, spectrum_spec, embed_dim ): super().__init__() self.min_wavelength = spectrum_spec['min_wavelength'] self.max_wavelength = spectrum_spec['max_wavelength'] self.central_lambda = 0.5*(float(self.min_wavelength) + float(self.max_wavelength)) self.spectrum_projections = spectrum_projections self.weights = [] for spectrum_proj in self.spectrum_projections: central_lambda = 0.5*(float(spectrum_proj.min_wavelength) + float(spectrum_proj.max_wavelength)) self.weights.append(abs(self.central_lambda-central_lambda)) self.weights = np.array(self.weights) / sum(self.weights) self.embed_dim = embed_dim def forward(self, x): out = 0. #torch.zeros((len(x),self.embed_dim)) for i, spectrum_proj in enumerate(self.spectrum_projections): out += spectrum_proj(x) * self.weights[i] return out class SpectrumAwareProjection(nn.Module): """Patch Embedding of a sensor without patchify""" def __init__( self, spectrum_specs, patch_size, embed_dim, bias=True ): super().__init__() self.nb_pixels = patch_size**2 self.spectrum_embeds = torch.nn.ModuleList() for spectral_range in sorted(spectrum_specs,key=lambda key:spectrum_specs[key]['projection_idx']): if ((spectrum_specs[spectral_range]['projection_idx'] != -1) and (len(spectrum_specs[spectral_range]['agg_projections']) == 0)) : self.spectrum_embeds.append(SpectrumRangeProjection( spectral_range, spectrum_specs[spectral_range], patch_size, embed_dim )) for spectral_range in sorted(spectrum_specs,key=lambda key:spectrum_specs[key]['projection_idx']): if ((spectrum_specs[spectral_range]['projection_idx'] != -1) and (len(spectrum_specs[spectral_range]['agg_projections']) > 0)): self.spectrum_embeds.append( SpectrumRangeProjectionAvg( [self.spectrum_embeds[agg_proj_idx] for agg_proj_idx in spectrum_specs[spectral_range]['agg_projections']], spectrum_specs[spectral_range], embed_dim)) def forward(self, x, projection_idx): return self.spectrum_embeds[projection_idx](x) # -------------------------------------------------------- # 2D sine-cosine position embedding # References: # Transformer: https://github.com/tensorflow/models/blob/master/official/nlp/transformer/model_utils.py # MoCo v3: https://github.com/facebookresearch/moco-v3 # -------------------------------------------------------- def get_2d_sincos_pos_embed(embed_dim, grid_size, cls_token=False): """ grid_size: int of the grid height and width return: pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token) """ grid_h = np.arange(grid_size, dtype=float) grid_w = np.arange(grid_size, dtype=float) grid = np.meshgrid(grid_w, grid_h) # here w goes first grid = np.stack(grid, axis=0) grid = grid.reshape([2, 1, grid_size, grid_size]) pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid) if cls_token: pos_embed = np.concatenate([np.zeros([1, embed_dim]), pos_embed], axis=0) return pos_embed def get_2d_sincos_pos_embed_from_grid(embed_dim, grid): assert embed_dim % 2 == 0 # use half of dimensions to encode grid_h emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0]) # (H*W, D/2) emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1]) # (H*W, D/2) emb = np.concatenate([emb_h, emb_w], axis=1) # (H*W, D) return emb def get_1d_sincos_pos_embed_from_grid(embed_dim, pos): """ embed_dim: output dimension for each position pos: a list of positions to be encoded: size (M,) out: (M, D) """ assert embed_dim % 2 == 0 omega = np.arange(embed_dim // 2, dtype=float) omega /= embed_dim / 2.0 omega = 1.0 / 10000**omega # (D/2,) pos = pos.reshape(-1) # (M,) out = np.einsum("m,d->md", pos, omega) # (M, D/2), outer product emb_sin = np.sin(out) # (M, D/2) emb_cos = np.cos(out) # (M, D/2) emb = np.concatenate([emb_sin, emb_cos], axis=1) # (M, D) return emb def get_dtype(mixed_precision): if mixed_precision == 'no': return torch.float32 elif mixed_precision == 'bf16': return torch.bfloat16 elif mixed_precision == 'fp16': return torch.float16 else: raise NotImplementedError class SMARTIESHF(PreTrainedModel): config_class = SMARTIESConfig def __init__(self, config: SMARTIESConfig): super().__init__(config) try: if config.spectrum_specs is None: spectrum_path = cached_file( config.name_or_path, "spectrum_specs.yaml" ) with open(spectrum_path, "r") as f: config.spectrum_specs = yaml.safe_load(f) except Exception as e: raise RuntimeError( "spectrum_specs couldn't be loaded from spectrum_specs.yaml. " \ "Please load yaml file yourself and provide the argument spectrum_specs with the loaded file." ) from e self.model_dtype = get_dtype(config.mixed_precision) self.embed_dim = config.embed_dim self.decoder_embed_dim = config.decoder_embed_dim self.projection_conversion = {i: config.spectrum_specs[i]['projection_idx'] for i in config.spectrum_specs} self.sensor_band_specs = { 'S2': [ 'aerosol', 'blue_1', 'green_2', 'red_2', 'red_edge_1', 'red_edge_2', 'near_infrared_2', 'near_infrared_1', 'near_infrared_3', 'short_wave_infrared_1', 'short_wave_infrared_3', 'short_wave_infrared_4' ], 'S1': [ 'microwave_1', 'microwave_2' ], 'RGB': [ 'red_1', 'green_1', 'blue_3' ] } self.sensor_projection_specs = {} for sensor_name in self.sensor_band_specs: self.sensor_projection_specs[sensor_name] = np.array( [self.projection_conversion[i] for i in self.sensor_band_specs[sensor_name]]) self.patch_size = config.patch_size self.pos_drop = nn.Dropout(p=config.pos_drop_rate) self.nb_patch_length = int(config.img_size / self.patch_size) self.num_patches = self.nb_patch_length**2 self.cls_token = nn.Parameter(torch.zeros(1, 1, self.embed_dim)) self.pos_embed = nn.Parameter(torch.zeros(1, self.num_patches + 1, self.embed_dim), requires_grad=False) # fixed sin-cos embedding self.spectrum_projection = SpectrumAwareProjection( spectrum_specs=config.spectrum_specs, patch_size=self.patch_size, embed_dim=self.embed_dim ) pos_embed = get_2d_sincos_pos_embed( self.pos_embed.shape[-1], self.nb_patch_length, cls_token=True, ) self.pos_embed.data.copy_(torch.from_numpy(pos_embed).float().unsqueeze(0)) self.projection_scaler = 12 self.norm_layer = partial(nn.LayerNorm, eps=config.norm_layer_eps) self.blocks = nn.ModuleList([ Block(self.embed_dim, config.num_heads, config.mlp_ratio, qkv_bias=config.qkv_bias, norm_layer=self.norm_layer) for i in range(config.depth)]) self.norm = self.norm_layer(self.embed_dim) self.global_pool = config.global_pool if self.global_pool: self.fc_norm = self.norm_layer(self.embed_dim) # decoder specifics self.decoder_embed = nn.Linear(self.embed_dim, self.decoder_embed_dim, bias=True) self.mask_token = nn.Parameter(torch.zeros(1, 1, self.decoder_embed_dim)) self.decoder_pos_embed = nn.Parameter(torch.zeros(1, self.num_patches + 1, self.decoder_embed_dim), requires_grad=False) # fixed sin-cos embedding self.projection_scaler = 12 self.decoder_blocks = nn.ModuleList([ Block(self.decoder_embed_dim, config.decoder_num_heads, config.mlp_ratio, qkv_bias=True, norm_layer=self.norm_layer) for i in range(config.decoder_depth)]) self.decoder_norm = self.norm_layer(self.decoder_embed_dim) self.decoder_preds = torch.nn.ModuleList() for band_idx in sorted(config.spectrum_specs, key=lambda key: config.spectrum_specs[key]['projection_idx']): if ((config.spectrum_specs[band_idx]['projection_idx'] != -1) and (len(config.spectrum_specs[band_idx]['agg_projections']) == 0)): self.decoder_preds.append(nn.Linear(self.decoder_embed_dim, self.patch_size**2, bias=True)) def tensor_patchify(self, imgs): """ imgs: (N, nb_bands, H, W) x: (N, L, patch_size**2 *nb_bands) """ p = self.patch_size assert imgs.shape[2] == imgs.shape[3] and imgs.shape[2] % p == 0 h = w = imgs.shape[2] // p x = imgs.reshape(shape=(imgs.shape[0], imgs.shape[1], h, p, w, p)) x = torch.einsum('nchpwq->nhwpqc', x) x = x.reshape(shape=(imgs.shape[0], h, w, p, p, imgs.shape[1])).permute(0,1,2,5,3,4) return x def forward_encoder(self, imgs, proj_indices, is_patchify, all_tokens): if is_patchify: img_patches = self.tensor_patchify(imgs) else: img_patches = imgs B, nb_patch_h, nb_patch_w, nb_bands, _, _ = img_patches.shape device = img_patches.device img_spectrum_embeds = torch.zeros((B, nb_patch_h, nb_patch_w, nb_bands, self.embed_dim), device=device, dtype=self.model_dtype) for projection_idx in torch.unbind(torch.unique(proj_indices)): mask = (proj_indices==projection_idx) img_spectrum_embeds[mask] = self.spectrum_projection(img_patches[mask], projection_idx) img_embeddings = self.projection_scaler*img_spectrum_embeds.mean(dim=3) img_embeddings = img_embeddings.reshape(-1,nb_patch_h*nb_patch_w,self.embed_dim) cls_tokens = self.cls_token.expand( B, -1, -1 ) x = torch.cat((cls_tokens, img_embeddings), dim=1) x = x + self.pos_embed x = self.pos_drop(x) for blk in self.blocks: x = blk(x) if all_tokens: return self.norm(x) # B, L, embed_dim (L=1+patch_size**2) if self.global_pool: x = x[:, 1:, :].mean(dim=1) outcome = self.fc_norm(x) else: x = self.norm(x) outcome = x[:, 0] return outcome def forward(self, imgs, is_patchify=True, sensor_type='S2', bands=None, proj_indices=None, all_tokens=False): if proj_indices is None: if bands is None: assert sensor_type in self.sensor_band_specs.keys(), f"Sensor type {sensor_type} not recognized. Available types: {list(self.sensor_band_specs.keys())}. Otherwise provide bands." proj_indices = self.sensor_projection_specs[sensor_type] else: proj_indices = [] for i in bands: if i in self.projection_conversion.keys(): proj_indices.append(self.projection_conversion[i]) assert len(proj_indices) > 0, \ "No valid bands provided. Please check the bands to be aligned with the spectrum_specs definition \ (default version can be accessed at https://github.com/gsumbul/SMARTIES/blob/main/config/electromagnetic_spectrum.yaml)." proj_indices = np.array(proj_indices) proj_indices = torch.as_tensor(np.tile(proj_indices.reshape( 1,1,1,-1), (imgs.shape[0], self.nb_patch_length, self.nb_patch_length, 1)).astype(np.int32), device=imgs.device) return self.forward_encoder(imgs, proj_indices, is_patchify=is_patchify, all_tokens=all_tokens)