Spaces:
Runtime error
Runtime error
import os | |
import csv | |
import torch | |
import random | |
import numpy as np | |
from torch.autograd import Variable | |
from model.tf_model import subsequent_mask | |
from torch.utils.data import Dataset | |
class Batch: | |
"Object for holding a batch of data with mask during training." | |
def __init__(self, src, trg=None, ys=None, pad=0): | |
self.src = src | |
self.ys = ys | |
self.src_len = src[:, 0, :] | |
self.src_mask = (self.src_len != pad).unsqueeze(-2) | |
if trg is not None: | |
# ------for EEG------- | |
self.trg = trg[:, :, :-1] | |
self.trg_len = trg[:, 0, :] | |
self.trg_x = self.trg_len[:, :-1] | |
self.trg_y = self.trg_len[:, 1:] | |
# ------for NLP------- | |
#self.trg = trg[:, :-1] | |
#self.trg_x = trg[:, :-1] | |
#self.trg_y = trg[:, 1:] | |
self.trg_mask = \ | |
self.make_std_mask(self.trg_x, pad) | |
self.ntokens = (self.trg_y != pad).data.sum() | |
def make_std_mask(tgt, pad): | |
"Create a mask to hide padding and future words." | |
tgt_mask = (tgt != pad).unsqueeze(-2) | |
tgt_mask = tgt_mask & Variable( | |
subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)) | |
return tgt_mask | |
def data_gen(V, batch, nbatches): | |
"Generate random data for a src-tgt copy task." | |
for i in range(nbatches): | |
data = torch.from_numpy(np.random.randint(1, V, size=(batch, 10))) | |
data[:, 0] = 1 | |
src = Variable(data, requires_grad=False) | |
tgt = Variable(data, requires_grad=False) | |
yield Batch(src, tgt, 0) | |
def data_load(train_loader): | |
print("data_load:", len(train_loader)) | |
for i, (attr, target, ys) in enumerate(train_loader): | |
#src = Variable(attr, requires_grad=False) | |
#tgt = Variable(target, requires_grad=False) | |
#print("data_load1:", attr.shape) | |
#print("data_load2:", target.shape) | |
src = attr.cuda() | |
tgt = target.cuda() | |
ys = ys.cuda() | |
yield Batch(src, tgt, ys, 0) | |
class preDataset(Dataset): | |
def __init__(self, mode, train_len=870300, block_num=1): | |
self.sample_rate = 256 | |
self.lenth = 870300 #train_len | |
self.lenthtest = 3600 | |
self.lenthval = 3500 | |
self.mode = mode | |
self.block_num = block_num | |
def __len__(self): | |
if self.mode == 2: | |
return self.lenthval | |
elif self.mode == 1: | |
return self.lenthtest | |
else: | |
return self.lenth | |
def __getitem__(self, idx): | |
''' | |
:param idx: | |
:return: | |
''' | |
data_mode = ["Brain", "ChannelNoise", "Eye", "Heart", "LineNoise", "Muscle", "Other"] | |
## step 1 locate idx into dataset | |
dataset = ["Hyperscanning_navigation", "Hyperscanning_slapjack", "Lane_keeping"] | |
if idx < 249816: | |
now_idx = idx | |
dataloc = 0 | |
elif idx >= 249816 and idx < 506365: | |
now_idx = idx - 249816 | |
dataloc = 1 | |
elif idx >= 506365: | |
now_idx = idx - 506365 | |
dataloc = 2 | |
## step 2 read data | |
folder_name = './MetaPreTrain/' + dataset[dataloc] + '/3_ICA/' | |
allFileList = os.listdir(folder_name) | |
file_name1 = folder_name + allFileList[now_idx] | |
#print("preDataset1: ", allFileList[now_idx]) | |
## get after 4 sec data | |
string_array = allFileList[now_idx] | |
parts = string_array[0].split('_') | |
numeric_part = parts[-1].split('.')[0] | |
new_numeric_part = str(int(numeric_part) + 4) | |
parts[-1] = new_numeric_part + '.csv' | |
string_array = '_'.join(parts) | |
file_name2 = folder_name + string_array | |
#print("preDataset2: ", string_array) | |
try: | |
data_nosie = self.read_train_data(file_name1) | |
data_clean = self.read_train_data(file_name2) | |
except: | |
data_nosie = self.read_train_data(file_name1) | |
data_clean = data_nosie | |
## step 3 signal normalize | |
max_num = np.max(data_nosie) | |
data_avg = np.average(data_nosie) | |
data_std = np.std(data_nosie) | |
if int(data_std) != 0: | |
target = np.array((data_clean - data_avg) / data_std).astype(np.float64) | |
attr = np.array((data_nosie - data_avg) / data_std).astype(np.float64) | |
else: | |
target = np.array(data_clean - data_avg).astype(np.float64) | |
attr = np.array(data_nosie - data_avg).astype(np.float64) | |
## step 4 deep copy & return | |
#target = target.copy() | |
target = torch.FloatTensor(target) | |
#attr = attr.copy() | |
attr = torch.FloatTensor(attr) | |
ys = torch.ones(30, 1023).fill_(1).type_as(attr) | |
return attr, target, ys | |
def read_train_data(self, file_name): | |
with open(file_name, 'r', newline='') as f: | |
lines = csv.reader(f) | |
data = [] | |
for line in lines: | |
data.append(line) | |
new_data = np.array(data).astype(np.float64) | |
return new_data | |
class myDataset(Dataset): | |
def __init__(self, mode, train_len=0, block_num=1): | |
self.sample_rate = 256 | |
self.lenth = train_len | |
self.lenthtest = 3600 | |
self.lenthval = 3500 | |
self.mode = mode | |
self.block_num = block_num | |
def __len__(self): | |
if self.mode == 2: | |
return self.lenthval | |
elif self.mode == 1: | |
return self.lenthtest | |
else: | |
return self.lenth | |
def __getitem__(self, idx): | |
''' | |
:param idx: | |
:return: | |
''' | |
data_mode = ["Brain", "ChannelNoise", "Eye", "Heart", "LineNoise", "Muscle", "Other"] | |
if self.mode == 2: | |
allFileList = os.listdir("./Real_EEG/val/Brain/") | |
file_name = './Real_EEG/val/Brain/' + allFileList[idx] | |
data_clean = self.read_train_data(file_name) | |
for i in range(7): | |
file_name = './Real_EEG/val/' + data_mode[random.randint(0, 6)] + '/' + allFileList[idx] | |
if os.path.isfile(file_name): | |
data_nosie = self.read_train_data(file_name) | |
break | |
else: | |
data_nosie = data_clean | |
elif self.mode == 1: | |
allFileList = os.listdir("./Real_EEG/test/Brain/") | |
file_name = './Real_EEG/test/Brain/' + allFileList[idx] | |
data_clean = self.read_train_data(file_name) | |
for i in range(7): | |
file_name = './Real_EEG/test/' + data_mode[random.randint(0, 6)] + '/' + allFileList[idx] | |
if os.path.isfile(file_name): | |
data_nosie = self.read_train_data(file_name) | |
break | |
else: | |
data_nosie = data_clean | |
else: | |
allFileList = os.listdir("./Real_EEG/train/Brain/") | |
file_name = './Real_EEG/train/Brain/' + allFileList[idx] | |
#print("dataloader: ", file_name) | |
data_clean = self.read_train_data(file_name) | |
for i in range(7): | |
file_name = './Real_EEG/train/' + data_mode[random.randint(0, 6)] + '/' + allFileList[idx] | |
if os.path.isfile(file_name): | |
data_nosie = self.read_train_data(file_name) | |
break | |
else: | |
data_nosie = data_clean | |
#print(file_name) | |
#print("data_set", noise.shape) | |
max_num = np.max(data_nosie) | |
data_avg = np.average(data_nosie) | |
data_std = np.std(data_nosie) | |
#max_num = 100 | |
#print("max_num: ", max_num) | |
#target = np.array(data / max_num).astype(np.float) | |
if int(data_std) != 0: | |
target = np.array((data_clean - data_avg) / data_std).astype(np.float64) | |
attr = np.array((data_nosie - data_avg) / data_std).astype(np.float64) | |
else: | |
target = np.array(data_clean - data_avg).astype(np.float64) | |
attr = np.array(data_nosie - data_avg).astype(np.float64) | |
## step 4 deep copy & return | |
# target = target.copy() | |
target = torch.FloatTensor(target) | |
# attr = attr.copy() | |
attr = torch.FloatTensor(attr) | |
ys = torch.ones(30, 1023).fill_(1).type_as(attr) | |
return attr, target, ys | |
def read_train_data(self, file_name): | |
with open(file_name, 'r', newline='') as f: | |
lines = csv.reader(f) | |
data = [] | |
for line in lines: | |
data.append(line) | |
new_data = np.array(data).astype(np.float64) | |
''' for training 19 channels | |
row = np.array([0,1,2,3,4,5,6,12,13,14,15,16,22,23,24,25,26,27,29]) | |
new_data = [] | |
for i in range(19): | |
#print(i, row[i]) | |
#print(data[row[i]].shape) | |
new_data.append(data[row[i]]) | |
new_data = np.array(new_data).astype(np.float) | |
''' | |
# data = data.T | |
return new_data |