ArtifactRemovalTransformer / model /UNet_attention.py
roseDwayane's picture
model
c58daf7
import torch
from torch import nn
import numpy as np
__all__ = ['UNet', 'NestedUNet3', 'NestedUNet4', 'NestedUNet5']
class VGGBlock(nn.Module):
def __init__(self, in_channels, middle_channels, out_channels, ks=7):
super().__init__()
padding = int((ks - 1) / 2)
self.relu = nn.ReLU(inplace=True)
self.conv1 = nn.Conv1d(in_channels, middle_channels, kernel_size=ks, padding=padding)
self.bn1 = nn.BatchNorm1d(middle_channels)
self.conv2 = nn.Conv1d(middle_channels, out_channels, kernel_size=ks, padding=padding)
self.bn2 = nn.BatchNorm1d(out_channels)
def forward(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
return out
class MultiHeadAttentation(nn.Module):
def __init__(self, d, n_heads=2):
super(MultiHeadAttentation, self).__init__()
self.d = d
self.n_heads = n_heads
self.q_mappings = nn.ModuleList([nn.Linear(d, d) for _ in range(self.n_heads)])
self.k_mappings = nn.ModuleList([nn.Linear(d, d) for _ in range(self.n_heads)])
self.v_mappings = nn.ModuleList([nn.Linear(d, d) for _ in range(self.n_heads)])
self.o_mappings = nn.Linear(d * n_heads, d)
self.softmax = nn.Softmax(dim=-1)
def forward(self, sequences):
# Sequences has shape (N, seq_length, token_dim)
# We go into shape (N, seq_length, n_heads, token_dim / n_heads)
# And come back to (N, seq_length, item_dim) (through concatenation)
result_head = []
# multi-head
for head in range(self.n_heads):
q_mapping = self.q_mappings[head]
k_mapping = self.k_mappings[head]
v_mapping = self.v_mappings[head]
q, k, v = q_mapping(sequences), k_mapping(sequences), v_mapping(sequences)
k = torch.transpose(k, 1, 2)
a = torch.matmul(q, k)
attention = self.softmax(a / (self.d ** 0.5))
#print("attention: ", attention.shape)
o = torch.matmul(attention, v)
result_head.append(o)
#print("result_head: ", len(result_head))
o = torch.cat(result_head, axis=-1)
#print("output1: ", o.shape)
o = self.o_mappings(o)
#print("output2: ", o.shape)
return o
class AttentionBlock(nn.Module):
def __init__(self, hidden_d, n_heads, mlp_ratio=4):
super(AttentionBlock, self).__init__()
self.norm1 = nn.LayerNorm(hidden_d)
self.mha = MultiHeadAttentation(hidden_d, n_heads)
self.norm2 = nn.LayerNorm(hidden_d)
self.mlp = nn.Sequential(
nn.Linear(hidden_d, mlp_ratio * hidden_d),
nn.GELU(),
nn.Linear(mlp_ratio * hidden_d, hidden_d)
)
def forward(self, x):
out = x + self.mha(self.norm1(x))
out = out + self.mlp(self.norm2(out))
return out
class UNetpp3_Transformer(nn.Module):
def __init__(self, num_classes, input_channels=30, n_patches=7, n_blocks=2, n_heads=2):
super(UNetpp3_Transformer, self).__init__()
#device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")
nb_filter = [32, 64, 128, 256]
self.pool = nn.MaxPool1d(2)
# self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
self.up = nn.Upsample(scale_factor=2, mode='linear', align_corners=False)
self.positional_embeddings = self.get_positional_embeddings(30, 1024).to(device)
self.blocks0_0 = nn.ModuleList([AttentionBlock(1024, n_heads) for _ in range(n_blocks)])
self.blocks1_0 = nn.ModuleList([AttentionBlock(512, n_heads) for _ in range(n_blocks)])
self.blocks2_0 = nn.ModuleList([AttentionBlock(256, n_heads) for _ in range(n_blocks)])
self.blocks3_0 = nn.ModuleList([AttentionBlock(128, n_heads) for _ in range(n_blocks)])
self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
self.conv0_1 = VGGBlock(nb_filter[0] + nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_1 = VGGBlock(nb_filter[1] + nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_1 = VGGBlock(nb_filter[2] + nb_filter[3], nb_filter[2], nb_filter[2])
self.conv0_2 = VGGBlock(nb_filter[0] * 2 + nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_2 = VGGBlock(nb_filter[1] * 2 + nb_filter[2], nb_filter[1], nb_filter[1])
self.conv0_3 = VGGBlock(nb_filter[0] * 3 + nb_filter[1], nb_filter[0], nb_filter[0])
# if self.deep_supervision:
self.final1 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final2 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final3 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
def forward(self, input):
new_input = input + self.positional_embeddings
for block in self.blocks0_0:
x0_0 = block(new_input)
x0_0 = self.conv0_0(x0_0)
for block in self.blocks1_0:
x1_0 = block(self.pool(x0_0))
x1_0 = self.conv1_0(x1_0)
x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))
#print("input:", input.shape)
#print("x0_0: ", x0_0.shape)
#print("pool: ", self.pool(x0_0).shape)
#print("x1_0: ", x1_0.shape)
#print("up: ", self.up(x1_0).shape)
#print("cat: ", torch.cat([x0_0, self.up(x1_0)], 1).shape)
#print("x0_1: ", x0_1.shape)
for block in self.blocks2_0:
x2_0 = block(self.pool(x1_0))
x2_0 = self.conv2_0(x2_0)
x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))
for block in self.blocks3_0:
x3_0 = block(self.pool(x2_0))
x3_0 = self.conv3_0(x3_0)
x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_1)], 1))
x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))
#if self.deep_supervision:
output1 = self.final1(x0_1)
output2 = self.final2(x0_2)
output3 = self.final3(x0_3)
return output1, output2, output3
def get_positional_embeddings(self, channel, time_len):
result = torch.ones(channel, time_len)
for i in range(channel):
for j in range(time_len):
result[i][j] = np.sin(j / (10000 ** (i / channel))) if i % 2 == 0 else np.cos(j / (10000 ** ((i - 1) / channel)))
return result
class UNet(nn.Module):
def __init__(self, num_classes, input_channels=3, **kwargs):
super().__init__()
nb_filter = [32, 64, 128, 256, 512]
self.pool = nn.MaxPool1d(2)
#self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
self.up = nn.Upsample(scale_factor=2, mode='linear', align_corners=False)
# input_channel => 32; 32 => 64; 64=>128; 128=>256
self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
self.conv4_0 = VGGBlock(nb_filter[3], nb_filter[4], nb_filter[4])
self.conv3_1 = VGGBlock(nb_filter[3]+nb_filter[4], nb_filter[3], nb_filter[3])
self.conv2_2 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv1_3 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv0_4 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
self.final = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
def forward(self, input):
x0_0 = self.conv0_0(input)
x1_0 = self.conv1_0(self.pool(x0_0))
x2_0 = self.conv2_0(self.pool(x1_0))
x3_0 = self.conv3_0(self.pool(x2_0))
x4_0 = self.conv4_0(self.pool(x3_0))
x3_1 = self.conv3_1(torch.cat([x3_0, self.up(x4_0)], 1))
x2_2 = self.conv2_2(torch.cat([x2_0, self.up(x3_1)], 1))
x1_3 = self.conv1_3(torch.cat([x1_0, self.up(x2_2)], 1))
x0_4 = self.conv0_4(torch.cat([x0_0, self.up(x1_3)], 1))
output = self.final(x0_4)
return output
class NestedUNet3(nn.Module):
def __init__(self, num_classes, input_channels=30, deep_supervision=False, **kwargs):
super().__init__()
nb_filter = [32, 64, 128, 256]
self.deep_supervision = deep_supervision
self.pool = nn.MaxPool1d(2)
# self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
self.up = nn.Upsample(scale_factor=2, mode='linear', align_corners=False)
self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
self.conv0_1 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_1 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_1 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv0_2 = VGGBlock(nb_filter[0]*2+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_2 = VGGBlock(nb_filter[1]*2+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv0_3 = VGGBlock(nb_filter[0]*3+nb_filter[1], nb_filter[0], nb_filter[0])
#if self.deep_supervision:
self.final1 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final2 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final3 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
#else:
#self.final = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
def forward(self, input):
x0_0 = self.conv0_0(input)
x1_0 = self.conv1_0(self.pool(x0_0))
x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))
#print("input:", input.shape)
#print("x0_0: ", x0_0.shape)
#print("pool: ", self.pool(x0_0).shape)
#print("x1_0: ", x1_0.shape)
#print("up: ", self.up(x1_0).shape)
#print("cat: ", torch.cat([x0_0, self.up(x1_0)], 1).shape)
#print("x0_1: ", x0_1.shape)
x2_0 = self.conv2_0(self.pool(x1_0))
x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))
x3_0 = self.conv3_0(self.pool(x2_0))
x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_1)], 1))
x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))
#if self.deep_supervision:
output1 = self.final1(x0_1)
output2 = self.final2(x0_2)
output3 = self.final3(x0_3)
return output1, output2, output3
"""
else:
output = self.final(x0_4)
return output
"""
class NestedUNet4(nn.Module):
def __init__(self, num_classes, input_channels=30, deep_supervision=False, **kwargs):
super().__init__()
nb_filter = [32, 64, 128, 256, 512]
self.deep_supervision = deep_supervision
self.pool = nn.MaxPool1d(2)
# self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
self.up = nn.Upsample(scale_factor=2, mode='linear', align_corners=False)
self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
self.conv4_0 = VGGBlock(nb_filter[3], nb_filter[4], nb_filter[4])
self.conv0_1 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_1 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_1 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv3_1 = VGGBlock(nb_filter[3]+nb_filter[4], nb_filter[3], nb_filter[3])
self.conv0_2 = VGGBlock(nb_filter[0]*2+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_2 = VGGBlock(nb_filter[1]*2+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_2 = VGGBlock(nb_filter[2]*2+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv0_3 = VGGBlock(nb_filter[0]*3+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_3 = VGGBlock(nb_filter[1]*3+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv0_4 = VGGBlock(nb_filter[0]*4+nb_filter[1], nb_filter[0], nb_filter[0])
#if self.deep_supervision:
self.final1 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final2 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final3 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final4 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
#else:
#self.final = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
def forward(self, input):
x0_0 = self.conv0_0(input)
x1_0 = self.conv1_0(self.pool(x0_0))
x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))
#print("input:", input.shape)
#print("x0_0: ", x0_0.shape)
#print("pool: ", self.pool(x0_0).shape)
#print("x1_0: ", x1_0.shape)
#print("up: ", self.up(x1_0).shape)
#print("cat: ", torch.cat([x0_0, self.up(x1_0)], 1).shape)
#print("x0_1: ", x0_1.shape)
x2_0 = self.conv2_0(self.pool(x1_0))
x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))
x3_0 = self.conv3_0(self.pool(x2_0))
x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_1)], 1))
x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))
x4_0 = self.conv4_0(self.pool(x3_0))
x3_1 = self.conv3_1(torch.cat([x3_0, self.up(x4_0)], 1))
x2_2 = self.conv2_2(torch.cat([x2_0, x2_1, self.up(x3_1)], 1))
x1_3 = self.conv1_3(torch.cat([x1_0, x1_1, x1_2, self.up(x2_2)], 1))
x0_4 = self.conv0_4(torch.cat([x0_0, x0_1, x0_2, x0_3, self.up(x1_3)], 1))
#if self.deep_supervision:
output1 = self.final1(x0_1)
output2 = self.final2(x0_2)
output3 = self.final3(x0_3)
output4 = self.final4(x0_4)
return output1, output2, output3, output4
"""
else:
output = self.final(x0_4)
return output
"""
class NestedUNet5(nn.Module):
def __init__(self, num_classes, input_channels=30, deep_supervision=False, **kwargs):
super().__init__()
nb_filter = [32, 64, 128, 256, 512, 1024]
self.deep_supervision = deep_supervision
self.pool = nn.MaxPool1d(2)
# self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
self.up = nn.Upsample(scale_factor=2, mode='linear', align_corners=False)
self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
self.conv4_0 = VGGBlock(nb_filter[3], nb_filter[4], nb_filter[4])
self.conv5_0 = VGGBlock(nb_filter[4], nb_filter[5], nb_filter[5])
self.conv0_1 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_1 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_1 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv3_1 = VGGBlock(nb_filter[3]+nb_filter[4], nb_filter[3], nb_filter[3])
self.conv4_1 = VGGBlock(nb_filter[4]+nb_filter[5], nb_filter[4], nb_filter[4])
self.conv0_2 = VGGBlock(nb_filter[0]*2+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_2 = VGGBlock(nb_filter[1]*2+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_2 = VGGBlock(nb_filter[2]*2+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv3_2 = VGGBlock(nb_filter[3]*2+nb_filter[4], nb_filter[3], nb_filter[3])
self.conv0_3 = VGGBlock(nb_filter[0]*3+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_3 = VGGBlock(nb_filter[1]*3+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_3 = VGGBlock(nb_filter[2]*3+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv0_4 = VGGBlock(nb_filter[0]*4+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_4 = VGGBlock(nb_filter[1]*4+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv0_5 = VGGBlock(nb_filter[0]*5+nb_filter[1], nb_filter[0], nb_filter[0])
#if self.deep_supervision:
self.final1 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final2 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final3 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final4 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
self.final5 = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
#else:
#self.final = nn.Conv1d(nb_filter[0], num_classes, kernel_size=1)
def forward(self, input):
x0_0 = self.conv0_0(input)
x1_0 = self.conv1_0(self.pool(x0_0))
x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))
#print("input:", input.shape)
#print("x0_0: ", x0_0.shape)
#print("pool: ", self.pool(x0_0).shape)
#print("x1_0: ", x1_0.shape)
#print("up: ", self.up(x1_0).shape)
#print("cat: ", torch.cat([x0_0, self.up(x1_0)], 1).shape)
#print("x0_1: ", x0_1.shape)
x2_0 = self.conv2_0(self.pool(x1_0))
x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))
x3_0 = self.conv3_0(self.pool(x2_0))
x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_1)], 1))
x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))
x4_0 = self.conv4_0(self.pool(x3_0))
x3_1 = self.conv3_1(torch.cat([x3_0, self.up(x4_0)], 1))
x2_2 = self.conv2_2(torch.cat([x2_0, x2_1, self.up(x3_1)], 1))
x1_3 = self.conv1_3(torch.cat([x1_0, x1_1, x1_2, self.up(x2_2)], 1))
x0_4 = self.conv0_4(torch.cat([x0_0, x0_1, x0_2, x0_3, self.up(x1_3)], 1))
x5_0 = self.conv5_0(self.pool(x4_0))
x4_1 = self.conv4_1(torch.cat([x4_0, self.up(x5_0)], 1))
x3_2 = self.conv3_2(torch.cat([x3_0, x3_1, self.up(x4_1)], 1))
x2_3 = self.conv2_3(torch.cat([x2_0, x2_1, x2_2, self.up(x3_2)], 1))
x1_4 = self.conv1_4(torch.cat([x1_0, x1_1, x1_2, x1_3, self.up(x2_3)], 1))
x0_5 = self.conv0_5(torch.cat([x0_0, x0_1, x0_2, x0_3, x0_4, self.up(x1_4)], 1))
#if self.deep_supervision:
output1 = self.final1(x0_1)
output2 = self.final2(x0_2)
output3 = self.final3(x0_3)
output4 = self.final4(x0_4)
output5 = self.final5(x0_5)
return output1, output2, output3, output4, output5
"""
else:
output = self.final(x0_4)
return output
"""