Spaces:
Sleeping
Sleeping
| import json | |
| import math | |
| import os | |
| from statistics import mean, stdev | |
| import torch | |
| from sklearn.metrics import mean_squared_error, balanced_accuracy_score | |
| from torch import nn | |
| from torch.nn import functional as F | |
| import utils | |
| from utils import prediction2label | |
| from scipy.stats import kendalltau | |
| class ordinal_loss(nn.Module): | |
| """Ordinal regression with encoding as in https://arxiv.org/pdf/0704.1028.pdf""" | |
| def __init__(self, weight_class=False): | |
| super(ordinal_loss, self).__init__() | |
| self.weights = weight_class | |
| def forward(self, predictions, targets): | |
| # Fill in ordinalCoefficientVariationLoss target function, i.e. 0 -> [1,0,0,...] | |
| modified_target = torch.zeros_like(predictions) | |
| for i, target in enumerate(targets): | |
| modified_target[i, 0:target + 1] = 1 | |
| # if torch tensor is empty, return 0 | |
| if predictions.shape[0] == 0: | |
| return 0 | |
| # loss | |
| if self.weights is not None: | |
| return torch.sum((self.weights * F.mse_loss(predictions, modified_target, reduction="none")).mean(axis=1)) | |
| else: | |
| return torch.sum((F.mse_loss(predictions, modified_target, reduction="none")).mean(axis=1)) | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.metrics import confusion_matrix | |
| class ContextAttention(nn.Module): | |
| def __init__(self, size, num_head): | |
| super(ContextAttention, self).__init__() | |
| self.attention_net = nn.Linear(size, size) | |
| self.num_head = num_head | |
| if size % num_head != 0: | |
| raise ValueError("size must be dividable by num_head", size, num_head) | |
| self.head_size = int(size / num_head) | |
| self.context_vector = torch.nn.Parameter(torch.Tensor(num_head, self.head_size, 1)) | |
| nn.init.uniform_(self.context_vector, a=-1, b=1) | |
| def get_attention(self, x): | |
| attention = self.attention_net(x) | |
| attention_tanh = torch.tanh(attention) | |
| attention_split = torch.stack(attention_tanh.split(split_size=self.head_size, dim=2), dim=0) | |
| similarity = torch.bmm(attention_split.view(self.num_head, -1, self.head_size), self.context_vector) | |
| similarity = similarity.view(self.num_head, x.shape[0], -1).permute(1, 2, 0) | |
| return similarity | |
| def forward(self, x): | |
| attention = self.attention_net(x) | |
| attention_tanh = torch.tanh(attention) | |
| if self.head_size != 1: | |
| attention_split = torch.stack(attention_tanh.split(split_size=self.head_size, dim=2), dim=0) | |
| similarity = torch.bmm(attention_split.view(self.num_head, -1, self.head_size), self.context_vector) | |
| similarity = similarity.view(self.num_head, x.shape[0], -1).permute(1, 2, 0) | |
| similarity[x.sum(-1) == 0] = -1e4 # mask out zero padded_ones | |
| softmax_weight = torch.softmax(similarity, dim=1) | |
| x_split = torch.stack(x.split(split_size=self.head_size, dim=2), dim=2) | |
| weighted_x = x_split * softmax_weight.unsqueeze(-1).repeat(1, 1, 1, x_split.shape[-1]) | |
| attention = weighted_x.view(x_split.shape[0], x_split.shape[1], x.shape[-1]) | |
| else: | |
| softmax_weight = torch.softmax(attention, dim=1) | |
| attention = softmax_weight * x | |
| sum_attention = torch.sum(attention, dim=1) | |
| return sum_attention | |
| class ResidualBlock(nn.Module): | |
| def __init__(self, in_channels, out_channels, kernel_size, stride, padding): | |
| super(ResidualBlock, self).__init__() | |
| self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding) | |
| self.bn1 = nn.BatchNorm2d(out_channels) | |
| self.relu = nn.ReLU() | |
| self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size, stride, padding) | |
| self.bn2 = nn.BatchNorm2d(out_channels) | |
| self.shortcut = nn.Sequential() | |
| if in_channels != out_channels: | |
| self.shortcut = nn.Sequential( | |
| nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride), | |
| nn.BatchNorm2d(out_channels) | |
| ) | |
| def forward(self, x): | |
| identity = self.shortcut(x) | |
| out = self.relu(self.bn1(self.conv1(x))) | |
| out = self.bn2(self.conv2(out)) | |
| out += identity # Skip Connection | |
| out = self.relu(out) | |
| return out | |
| def get_conv_layer(rep_name): | |
| if "pianoroll" in rep_name: | |
| in_channels = 2 | |
| kernel_width = (3, 4, 4) # 88 | |
| elif "mel" in rep_name: | |
| in_channels = 1 | |
| kernel_width = (3, 4, 4) # 64 | |
| elif "cqt" in rep_name: | |
| in_channels = 1 | |
| kernel_width = (3, 4, 4) # 88 | |
| else: | |
| raise ValueError("Representation not implemented") | |
| if "5" in rep_name: | |
| kernel_height = (3, 4, 4) | |
| elif "10" in rep_name: | |
| kernel_height = (4, 5, 5) | |
| elif "20" in rep_name: | |
| kernel_height = (4, 6, 6) | |
| else: | |
| raise ValueError("Representation not implemented") | |
| convs = nn.Sequential( | |
| ResidualBlock(in_channels, 64, 3, 1, 1), | |
| nn.MaxPool2d((kernel_height[0], kernel_width[0])), # Adjusted pooling to handle increased length | |
| nn.Dropout(0.1), | |
| ResidualBlock(64, 128, 3, 1, 1), | |
| nn.MaxPool2d((kernel_height[1], kernel_width[1])), # Adjusted pooling | |
| nn.Dropout(0.1), | |
| ResidualBlock(128, 256, 3, 1, 1), | |
| nn.MaxPool2d((kernel_height[2], kernel_width[2])), # Adjusted pooling | |
| nn.Dropout(0.1) | |
| ) | |
| return convs | |
| class multimodal_cnns(nn.Module): | |
| def __init__(self, modality_dropout, only_cqt=False, only_pr=False): | |
| super().__init__() | |
| self.midi_branch = get_conv_layer("pianoroll5") | |
| self.audio_branch = get_conv_layer("cqt5") | |
| self.modality_dropout = modality_dropout | |
| self.only_cqt = only_cqt | |
| self.only_pr = only_pr | |
| def forward(self, x): | |
| x_midi, x_audio = x | |
| x_midi = self.midi_branch(x_midi).squeeze(-1) | |
| x_audio = self.audio_branch(x_audio).squeeze(-1) | |
| # do a modality dropout | |
| if self.only_cqt: | |
| x_midi = torch.zeros_like(x_midi, device=x_midi.device) | |
| elif self.only_pr: | |
| x_audio = torch.zeros_like(x_audio, device=x_audio.device) | |
| x_midi_trimmed = x_midi[:, :, :x_audio.size(2)] | |
| cnns_out = torch.cat((x_midi_trimmed, x_audio), 1) | |
| return cnns_out | |
| class AudioModel(nn.Module): | |
| def __init__(self, num_classes, rep, modality_dropout, only_cqt=False, only_pr=False): | |
| super(AudioModel, self).__init__() | |
| # All Convolutional Layers in a Sequential Block | |
| if "pianoroll" in rep: | |
| conv = get_conv_layer(rep) | |
| elif "cqt" in rep: | |
| conv = get_conv_layer(rep) | |
| elif "mel" in rep: | |
| conv = get_conv_layer(rep) | |
| elif "multi" in rep: | |
| conv = multimodal_cnns(modality_dropout, only_cqt, only_pr) | |
| self.conv_layers = conv | |
| # Calculate the size of GRU input feature | |
| self.gru_input_size = 512 if "multi" in rep else 256 | |
| # GRU Layer | |
| self.gru = nn.GRU(input_size=self.gru_input_size, hidden_size=128, num_layers=2, | |
| batch_first=True, bidirectional=True) | |
| self.context_attention = ContextAttention(size=256, num_head=4) | |
| self.non_linearity = nn.ReLU() | |
| # Fully connected layer | |
| self.fc = nn.Linear(256, num_classes) | |
| def forward(self, x1, kk): | |
| # Applying Convolutional Block | |
| # print(x1.shape) | |
| x = self.conv_layers(x1) | |
| # Reshape for GRU input | |
| x = x.squeeze().transpose(0, 1).unsqueeze(0) # Reshaping to [batch, seq_len, features] | |
| # print(x.shape) | |
| x, _ = self.gru(x) | |
| # Attention | |
| x = self.context_attention(x) | |
| # classiffier | |
| x = self.non_linearity(x) | |
| x = self.fc(x) | |
| return x | |
| def load_json(name_file): | |
| with open(name_file, 'r') as fp: | |
| data = json.load(fp) | |
| return data | |