import os import torch import numpy as np from tqdm.auto import tqdm from torch.utils.data import Dataset from models.interpretable_diffusion.model_utils import ( normalize_to_neg_one_to_one, unnormalize_to_zero_to_one, ) from utils.masking_utils import noise_mask import torch import random model = torch.nn.Linear(2, 1) class SineDataset(Dataset): def __init__( self, window=128, num=30000, dim=12, save2npy=True, neg_one_to_one=True, seed=123, period='train', output_dir='./OUTPUT', predict_length=None, missing_ratio=None, style='separate', distribution='geometric', mean_mask_length=3 ): super(SineDataset, self).__init__() assert period in ['train', 'test'], 'period must be train or test.' if period == 'train': assert ~(predict_length is not None or missing_ratio is not None), '' self.pred_len, self.missing_ratio = predict_length, missing_ratio self.style, self.distribution, self.mean_mask_length = style, distribution, mean_mask_length self.dir = os.path.join(output_dir, 'samples') os.makedirs(self.dir, exist_ok=True) self.rawdata = self.sine_data_generation(no=num, seq_len=window, dim=dim, save2npy=save2npy, seed=seed, dir=self.dir, period=period) self.auto_norm = neg_one_to_one self.samples = self.normalize(self.rawdata) self.var_num = dim self.sample_num = self.samples.shape[0] self.window = window self.period, self.save2npy = period, save2npy if period == 'test': if missing_ratio is not None: self.masking = self.mask_data(seed) elif predict_length is not None: masks = np.ones(self.samples.shape) masks[:, -predict_length:, :] = 0 self.masking = masks.astype(bool) else: raise NotImplementedError() def normalize(self, rawdata): if self.auto_norm: data = normalize_to_neg_one_to_one(rawdata) return data def unnormalize(self, data): if self.auto_norm: data = unnormalize_to_zero_to_one(data) return data @staticmethod def sine_data_generation(no, seq_len, dim, save2npy=True, seed=123, dir="./", period='train'): """Sine data generation. Args: - no: the number of samples - seq_len: sequence length of the time-series - dim: feature dimensions Returns: - data: generated data """ # Store the state of the RNG to restore later. st0 = np.random.get_state() np.random.seed(seed) # Initialize the output data = list() # Generate sine data for i in tqdm(range(0, no), total=no, desc="Sampling sine-dataset"): # Initialize each time-series temp = list() # For each feature for k in range(dim): # Randomly drawn frequency and phase freq = np.random.uniform(0, 0.1) phase = np.random.uniform(0, 0.1) # Generate sine signal based on the drawn frequency and phase temp_data = [np.sin(freq * j + phase) for j in range(seq_len)] temp.append(temp_data) # Align row/column temp = np.transpose(np.asarray(temp)) # Normalize to [0,1] temp = (temp + 1)*0.5 # Stack the generated data data.append(temp) # Restore RNG. np.random.set_state(st0) data = np.array(data) if save2npy: np.save(os.path.join(dir, f"sine_ground_truth_{seq_len}_{period}.npy"), data) return data def mask_data(self, seed=2023): masks = np.ones_like(self.samples) # Store the state of the RNG to restore later. st0 = np.random.get_state() np.random.seed(seed) for idx in range(self.samples.shape[0]): x = self.samples[idx, :, :] # (seq_length, feat_dim) array mask = noise_mask(x, self.missing_ratio, self.mean_mask_length, self.style, self.distribution) # (seq_length, feat_dim) boolean array masks[idx, :, :] = mask if self.save2npy: np.save(os.path.join(self.dir, f"sine_masking_{self.window}.npy"), masks) # Restore RNG. np.random.set_state(st0) return masks.astype(bool) def __getitem__(self, ind): if self.period == 'test': x = self.samples[ind, :, :] # (seq_length, feat_dim) array m = self.masking[ind, :, :] # (seq_length, feat_dim) boolean array return torch.from_numpy(x).float(), torch.from_numpy(m) x = self.samples[ind, :, :] # (seq_length, feat_dim) array return torch.from_numpy(x).float() def __len__(self): return self.sample_num # class SineDataset(Dataset): # def __init__( # self, # window=128, # num=223, # dim=3, # save2npy=True, # neg_one_to_one=True, # seed=123, # period="train", # output_dir="./OUTPUT", # predict_length=None, # missing_ratio=None, # style="separate", # distribution="geometric", # mean_mask_length=3, # **kargs, # ): # super(SineDataset, self).__init__() # assert period in ["train", "test"], "period must be train or test." # if period == "train": # assert ~(predict_length is not None or missing_ratio is not None), "" # self.pred_len, self.missing_ratio = predict_length, missing_ratio # self.style, self.distribution, self.mean_mask_length = ( # style, # distribution, # mean_mask_length, # ) # self.dir = os.path.join(output_dir, "samples") # os.makedirs(self.dir, exist_ok=True) # self.rawdata = self.sine_data_generation( # no=num, # seq_len=window, # dim=dim, # save2npy=save2npy, # seed=seed, # dir=self.dir, # period=period, # ) # self.auto_norm = neg_one_to_one # self.samples = self.normalize(self.rawdata) # self.var_num = dim # self.sample_num = self.samples.shape[0] # self.window = window # self.period, self.save2npy = period, save2npy # if period == "test": # if missing_ratio is not None: # self.masking = self.mask_data(seed) # elif predict_length is not None: # masks = np.ones(self.samples.shape) # masks[:, -predict_length:, :] = 0 # self.masking = masks.astype(bool) # else: # raise NotImplementedError() # def normalize(self, rawdata): # if self.auto_norm: # data = normalize_to_neg_one_to_one(rawdata) # return data # def unnormalize(self, data): # if self.auto_norm: # data = unnormalize_to_zero_to_one(data) # return data # @staticmethod # def sine_data_generation( # no, seq_len, dim, save2npy=True, seed=123, dir="./", period="train" # ): # """Sine data generation. # Args: # - no: the number of samples # - seq_len: sequence length of the time-series # - dim: feature dimensions # Returns: # - data: generated data # """ # # Store the state of the RNG to restore later. # st0 = np.random.get_state() # np.random.seed(seed) # # Initialize the output # data = list() # # Generate sine data # for i in tqdm(range(0, no), total=no, desc="Sampling sine-dataset"): # # Initialize each time-series # # temp = list() # # # For each feature # # for k in range(dim): # # # Randomly drawn frequency and phase # # freq = np.random.uniform(0, 0.1) # # phase = np.random.uniform(0, 0.1) # # # Generate sine signal based on the drawn frequency and phase # # temp_data = [np.sin(freq * j + phase) for j in range(seq_len)] # # temp.append(temp_data) # # # Align row/column # # temp = np.transpose(np.asarray(temp)) # # # Normalize to [0,1] # # temp = (temp + 1) * 0.5 # # Stack the generated data # # data.append(temp) # # lrs = [] # # for i in range(60): # # lr_sched.step() # # lrs.append( # # optimizer.param_groups[0]["lr"] # # ) # temp = [] # for k in range(dim): # lrs = [] # optimizer = torch.optim.SGD(model.parameters(), lr=0.3) # lr_sched = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=7, T_mult=1, eta_min=0.005, last_epoch=-1) # for _ in range(random.randint(1, 14)): # lr_sched.step() # for _ in range(seq_len): # lr_sched.step() # lrs.append( # optimizer.param_groups[0]["lr"] # ) # temp.append(lrs) # # lrs.append( # # optimizer.param_groups[0]["lr"] # # ) # temp = np.transpose(np.asarray(temp)) # data.append(temp) # # plt.plot(lrs) # # Restore RNG. # np.random.set_state(st0) # data = np.array(data) # if save2npy: # np.save( # os.path.join(dir, f"sine_ground_truth_{seq_len}_{period}.npy"), data # ) # return data # def mask_data(self, seed=2023): # masks = np.ones_like(self.samples) # # Store the state of the RNG to restore later. # st0 = np.random.get_state() # np.random.seed(seed) # for idx in range(self.samples.shape[0]): # x = self.samples[idx, :, :] # (seq_length, feat_dim) array # mask = noise_mask( # x, # self.missing_ratio, # self.mean_mask_length, # self.style, # self.distribution, # ) # (seq_length, feat_dim) boolean array # masks[idx, :, :] = mask # if self.save2npy: # np.save(os.path.join(self.dir, f"sine_masking_{self.window}.npy"), masks) # # Restore RNG. # np.random.set_state(st0) # return masks.astype(bool) # def __getitem__(self, ind): # if self.period == "test": # x = self.samples[ind, :, :] # (seq_length, feat_dim) array # m = self.masking[ind, :, :] # (seq_length, feat_dim) boolean array # return torch.from_numpy(x).float(), torch.from_numpy(m) # x = self.samples[ind, :, :] # (seq_length, feat_dim) array # return torch.from_numpy(x).float() # def __len__(self): # return self.sample_num