audio-difficulty / model.py
PRamoneda
gpu to cpu
a5af45b
import json
import math
import os
from statistics import mean, stdev
import torch
from sklearn.metrics import mean_squared_error, balanced_accuracy_score
from torch import nn
from torch.nn import functional as F
import utils
from utils import prediction2label
from scipy.stats import kendalltau
class ordinal_loss(nn.Module):
"""Ordinal regression with encoding as in https://arxiv.org/pdf/0704.1028.pdf"""
def __init__(self, weight_class=False):
super(ordinal_loss, self).__init__()
self.weights = weight_class
def forward(self, predictions, targets):
# Fill in ordinalCoefficientVariationLoss target function, i.e. 0 -> [1,0,0,...]
modified_target = torch.zeros_like(predictions)
for i, target in enumerate(targets):
modified_target[i, 0:target + 1] = 1
# if torch tensor is empty, return 0
if predictions.shape[0] == 0:
return 0
# loss
if self.weights is not None:
return torch.sum((self.weights * F.mse_loss(predictions, modified_target, reduction="none")).mean(axis=1))
else:
return torch.sum((F.mse_loss(predictions, modified_target, reduction="none")).mean(axis=1))
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
class ContextAttention(nn.Module):
def __init__(self, size, num_head):
super(ContextAttention, self).__init__()
self.attention_net = nn.Linear(size, size)
self.num_head = num_head
if size % num_head != 0:
raise ValueError("size must be dividable by num_head", size, num_head)
self.head_size = int(size / num_head)
self.context_vector = torch.nn.Parameter(torch.Tensor(num_head, self.head_size, 1))
nn.init.uniform_(self.context_vector, a=-1, b=1)
def get_attention(self, x):
attention = self.attention_net(x)
attention_tanh = torch.tanh(attention)
attention_split = torch.stack(attention_tanh.split(split_size=self.head_size, dim=2), dim=0)
similarity = torch.bmm(attention_split.view(self.num_head, -1, self.head_size), self.context_vector)
similarity = similarity.view(self.num_head, x.shape[0], -1).permute(1, 2, 0)
return similarity
def forward(self, x):
attention = self.attention_net(x)
attention_tanh = torch.tanh(attention)
if self.head_size != 1:
attention_split = torch.stack(attention_tanh.split(split_size=self.head_size, dim=2), dim=0)
similarity = torch.bmm(attention_split.view(self.num_head, -1, self.head_size), self.context_vector)
similarity = similarity.view(self.num_head, x.shape[0], -1).permute(1, 2, 0)
similarity[x.sum(-1) == 0] = -1e4 # mask out zero padded_ones
softmax_weight = torch.softmax(similarity, dim=1)
x_split = torch.stack(x.split(split_size=self.head_size, dim=2), dim=2)
weighted_x = x_split * softmax_weight.unsqueeze(-1).repeat(1, 1, 1, x_split.shape[-1])
attention = weighted_x.view(x_split.shape[0], x_split.shape[1], x.shape[-1])
else:
softmax_weight = torch.softmax(attention, dim=1)
attention = softmax_weight * x
sum_attention = torch.sum(attention, dim=1)
return sum_attention
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
super(ResidualBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU()
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size, stride, padding)
self.bn2 = nn.BatchNorm2d(out_channels)
self.shortcut = nn.Sequential()
if in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
identity = self.shortcut(x)
out = self.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += identity # Skip Connection
out = self.relu(out)
return out
def get_conv_layer(rep_name):
if "pianoroll" in rep_name:
in_channels = 2
kernel_width = (3, 4, 4) # 88
elif "mel" in rep_name:
in_channels = 1
kernel_width = (3, 4, 4) # 64
elif "cqt" in rep_name:
in_channels = 1
kernel_width = (3, 4, 4) # 88
else:
raise ValueError("Representation not implemented")
if "5" in rep_name:
kernel_height = (3, 4, 4)
elif "10" in rep_name:
kernel_height = (4, 5, 5)
elif "20" in rep_name:
kernel_height = (4, 6, 6)
else:
raise ValueError("Representation not implemented")
convs = nn.Sequential(
ResidualBlock(in_channels, 64, 3, 1, 1),
nn.MaxPool2d((kernel_height[0], kernel_width[0])), # Adjusted pooling to handle increased length
nn.Dropout(0.1),
ResidualBlock(64, 128, 3, 1, 1),
nn.MaxPool2d((kernel_height[1], kernel_width[1])), # Adjusted pooling
nn.Dropout(0.1),
ResidualBlock(128, 256, 3, 1, 1),
nn.MaxPool2d((kernel_height[2], kernel_width[2])), # Adjusted pooling
nn.Dropout(0.1)
)
return convs
class multimodal_cnns(nn.Module):
def __init__(self, modality_dropout, only_cqt=False, only_pr=False):
super().__init__()
self.midi_branch = get_conv_layer("pianoroll5")
self.audio_branch = get_conv_layer("cqt5")
self.modality_dropout = modality_dropout
self.only_cqt = only_cqt
self.only_pr = only_pr
def forward(self, x):
x_midi, x_audio = x
x_midi = self.midi_branch(x_midi).squeeze(-1)
x_audio = self.audio_branch(x_audio).squeeze(-1)
# do a modality dropout
if self.only_cqt:
x_midi = torch.zeros_like(x_midi, device=x_midi.device)
elif self.only_pr:
x_audio = torch.zeros_like(x_audio, device=x_audio.device)
x_midi_trimmed = x_midi[:, :, :x_audio.size(2)]
cnns_out = torch.cat((x_midi_trimmed, x_audio), 1)
return cnns_out
class AudioModel(nn.Module):
def __init__(self, num_classes, rep, modality_dropout, only_cqt=False, only_pr=False):
super(AudioModel, self).__init__()
# All Convolutional Layers in a Sequential Block
if "pianoroll" in rep:
conv = get_conv_layer(rep)
elif "cqt" in rep:
conv = get_conv_layer(rep)
elif "mel" in rep:
conv = get_conv_layer(rep)
elif "multi" in rep:
conv = multimodal_cnns(modality_dropout, only_cqt, only_pr)
self.conv_layers = conv
# Calculate the size of GRU input feature
self.gru_input_size = 512 if "multi" in rep else 256
# GRU Layer
self.gru = nn.GRU(input_size=self.gru_input_size, hidden_size=128, num_layers=2,
batch_first=True, bidirectional=True)
self.context_attention = ContextAttention(size=256, num_head=4)
self.non_linearity = nn.ReLU()
# Fully connected layer
self.fc = nn.Linear(256, num_classes)
def forward(self, x1, kk):
# Applying Convolutional Block
# print(x1.shape)
x = self.conv_layers(x1)
# Reshape for GRU input
x = x.squeeze().transpose(0, 1).unsqueeze(0) # Reshaping to [batch, seq_len, features]
# print(x.shape)
x, _ = self.gru(x)
# Attention
x = self.context_attention(x)
# classiffier
x = self.non_linearity(x)
x = self.fc(x)
return x
def load_json(name_file):
with open(name_file, 'r') as fp:
data = json.load(fp)
return data