Spaces:
Sleeping
Sleeping
import os | |
import torch | |
import numpy as np | |
from tqdm.auto import tqdm | |
from torch.utils.data import Dataset | |
from models.interpretable_diffusion.model_utils import ( | |
normalize_to_neg_one_to_one, | |
unnormalize_to_zero_to_one, | |
) | |
from utils.masking_utils import noise_mask | |
import torch | |
import random | |
model = torch.nn.Linear(2, 1) | |
class SineDataset(Dataset): | |
def __init__( | |
self, | |
window=128, | |
num=30000, | |
dim=12, | |
save2npy=True, | |
neg_one_to_one=True, | |
seed=123, | |
period='train', | |
output_dir='./OUTPUT', | |
predict_length=None, | |
missing_ratio=None, | |
style='separate', | |
distribution='geometric', | |
mean_mask_length=3 | |
): | |
super(SineDataset, self).__init__() | |
assert period in ['train', 'test'], 'period must be train or test.' | |
if period == 'train': | |
assert ~(predict_length is not None or missing_ratio is not None), '' | |
self.pred_len, self.missing_ratio = predict_length, missing_ratio | |
self.style, self.distribution, self.mean_mask_length = style, distribution, mean_mask_length | |
self.dir = os.path.join(output_dir, 'samples') | |
os.makedirs(self.dir, exist_ok=True) | |
self.rawdata = self.sine_data_generation(no=num, seq_len=window, dim=dim, save2npy=save2npy, | |
seed=seed, dir=self.dir, period=period) | |
self.auto_norm = neg_one_to_one | |
self.samples = self.normalize(self.rawdata) | |
self.var_num = dim | |
self.sample_num = self.samples.shape[0] | |
self.window = window | |
self.period, self.save2npy = period, save2npy | |
if period == 'test': | |
if missing_ratio is not None: | |
self.masking = self.mask_data(seed) | |
elif predict_length is not None: | |
masks = np.ones(self.samples.shape) | |
masks[:, -predict_length:, :] = 0 | |
self.masking = masks.astype(bool) | |
else: | |
raise NotImplementedError() | |
def normalize(self, rawdata): | |
if self.auto_norm: | |
data = normalize_to_neg_one_to_one(rawdata) | |
return data | |
def unnormalize(self, data): | |
if self.auto_norm: | |
data = unnormalize_to_zero_to_one(data) | |
return data | |
def sine_data_generation(no, seq_len, dim, save2npy=True, seed=123, dir="./", period='train'): | |
"""Sine data generation. | |
Args: | |
- no: the number of samples | |
- seq_len: sequence length of the time-series | |
- dim: feature dimensions | |
Returns: | |
- data: generated data | |
""" | |
# Store the state of the RNG to restore later. | |
st0 = np.random.get_state() | |
np.random.seed(seed) | |
# Initialize the output | |
data = list() | |
# Generate sine data | |
for i in tqdm(range(0, no), total=no, desc="Sampling sine-dataset"): | |
# Initialize each time-series | |
temp = list() | |
# For each feature | |
for k in range(dim): | |
# Randomly drawn frequency and phase | |
freq = np.random.uniform(0, 0.1) | |
phase = np.random.uniform(0, 0.1) | |
# Generate sine signal based on the drawn frequency and phase | |
temp_data = [np.sin(freq * j + phase) for j in range(seq_len)] | |
temp.append(temp_data) | |
# Align row/column | |
temp = np.transpose(np.asarray(temp)) | |
# Normalize to [0,1] | |
temp = (temp + 1)*0.5 | |
# Stack the generated data | |
data.append(temp) | |
# Restore RNG. | |
np.random.set_state(st0) | |
data = np.array(data) | |
if save2npy: | |
np.save(os.path.join(dir, f"sine_ground_truth_{seq_len}_{period}.npy"), data) | |
return data | |
def mask_data(self, seed=2023): | |
masks = np.ones_like(self.samples) | |
# Store the state of the RNG to restore later. | |
st0 = np.random.get_state() | |
np.random.seed(seed) | |
for idx in range(self.samples.shape[0]): | |
x = self.samples[idx, :, :] # (seq_length, feat_dim) array | |
mask = noise_mask(x, self.missing_ratio, self.mean_mask_length, self.style, | |
self.distribution) # (seq_length, feat_dim) boolean array | |
masks[idx, :, :] = mask | |
if self.save2npy: | |
np.save(os.path.join(self.dir, f"sine_masking_{self.window}.npy"), masks) | |
# Restore RNG. | |
np.random.set_state(st0) | |
return masks.astype(bool) | |
def __getitem__(self, ind): | |
if self.period == 'test': | |
x = self.samples[ind, :, :] # (seq_length, feat_dim) array | |
m = self.masking[ind, :, :] # (seq_length, feat_dim) boolean array | |
return torch.from_numpy(x).float(), torch.from_numpy(m) | |
x = self.samples[ind, :, :] # (seq_length, feat_dim) array | |
return torch.from_numpy(x).float() | |
def __len__(self): | |
return self.sample_num | |
# class SineDataset(Dataset): | |
# def __init__( | |
# self, | |
# window=128, | |
# num=223, | |
# dim=3, | |
# save2npy=True, | |
# neg_one_to_one=True, | |
# seed=123, | |
# period="train", | |
# output_dir="./OUTPUT", | |
# predict_length=None, | |
# missing_ratio=None, | |
# style="separate", | |
# distribution="geometric", | |
# mean_mask_length=3, | |
# **kargs, | |
# ): | |
# super(SineDataset, self).__init__() | |
# assert period in ["train", "test"], "period must be train or test." | |
# if period == "train": | |
# assert ~(predict_length is not None or missing_ratio is not None), "" | |
# self.pred_len, self.missing_ratio = predict_length, missing_ratio | |
# self.style, self.distribution, self.mean_mask_length = ( | |
# style, | |
# distribution, | |
# mean_mask_length, | |
# ) | |
# self.dir = os.path.join(output_dir, "samples") | |
# os.makedirs(self.dir, exist_ok=True) | |
# self.rawdata = self.sine_data_generation( | |
# no=num, | |
# seq_len=window, | |
# dim=dim, | |
# save2npy=save2npy, | |
# seed=seed, | |
# dir=self.dir, | |
# period=period, | |
# ) | |
# self.auto_norm = neg_one_to_one | |
# self.samples = self.normalize(self.rawdata) | |
# self.var_num = dim | |
# self.sample_num = self.samples.shape[0] | |
# self.window = window | |
# self.period, self.save2npy = period, save2npy | |
# if period == "test": | |
# if missing_ratio is not None: | |
# self.masking = self.mask_data(seed) | |
# elif predict_length is not None: | |
# masks = np.ones(self.samples.shape) | |
# masks[:, -predict_length:, :] = 0 | |
# self.masking = masks.astype(bool) | |
# else: | |
# raise NotImplementedError() | |
# def normalize(self, rawdata): | |
# if self.auto_norm: | |
# data = normalize_to_neg_one_to_one(rawdata) | |
# return data | |
# def unnormalize(self, data): | |
# if self.auto_norm: | |
# data = unnormalize_to_zero_to_one(data) | |
# return data | |
# @staticmethod | |
# def sine_data_generation( | |
# no, seq_len, dim, save2npy=True, seed=123, dir="./", period="train" | |
# ): | |
# """Sine data generation. | |
# Args: | |
# - no: the number of samples | |
# - seq_len: sequence length of the time-series | |
# - dim: feature dimensions | |
# Returns: | |
# - data: generated data | |
# """ | |
# # Store the state of the RNG to restore later. | |
# st0 = np.random.get_state() | |
# np.random.seed(seed) | |
# # Initialize the output | |
# data = list() | |
# # Generate sine data | |
# for i in tqdm(range(0, no), total=no, desc="Sampling sine-dataset"): | |
# # Initialize each time-series | |
# # temp = list() | |
# # # For each feature | |
# # for k in range(dim): | |
# # # Randomly drawn frequency and phase | |
# # freq = np.random.uniform(0, 0.1) | |
# # phase = np.random.uniform(0, 0.1) | |
# # # Generate sine signal based on the drawn frequency and phase | |
# # temp_data = [np.sin(freq * j + phase) for j in range(seq_len)] | |
# # temp.append(temp_data) | |
# # # Align row/column | |
# # temp = np.transpose(np.asarray(temp)) | |
# # # Normalize to [0,1] | |
# # temp = (temp + 1) * 0.5 | |
# # Stack the generated data | |
# # data.append(temp) | |
# # lrs = [] | |
# # for i in range(60): | |
# # lr_sched.step() | |
# # lrs.append( | |
# # optimizer.param_groups[0]["lr"] | |
# # ) | |
# temp = [] | |
# for k in range(dim): | |
# lrs = [] | |
# optimizer = torch.optim.SGD(model.parameters(), lr=0.3) | |
# lr_sched = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=7, T_mult=1, eta_min=0.005, last_epoch=-1) | |
# for _ in range(random.randint(1, 14)): | |
# lr_sched.step() | |
# for _ in range(seq_len): | |
# lr_sched.step() | |
# lrs.append( | |
# optimizer.param_groups[0]["lr"] | |
# ) | |
# temp.append(lrs) | |
# # lrs.append( | |
# # optimizer.param_groups[0]["lr"] | |
# # ) | |
# temp = np.transpose(np.asarray(temp)) | |
# data.append(temp) | |
# # plt.plot(lrs) | |
# # Restore RNG. | |
# np.random.set_state(st0) | |
# data = np.array(data) | |
# if save2npy: | |
# np.save( | |
# os.path.join(dir, f"sine_ground_truth_{seq_len}_{period}.npy"), data | |
# ) | |
# return data | |
# def mask_data(self, seed=2023): | |
# masks = np.ones_like(self.samples) | |
# # Store the state of the RNG to restore later. | |
# st0 = np.random.get_state() | |
# np.random.seed(seed) | |
# for idx in range(self.samples.shape[0]): | |
# x = self.samples[idx, :, :] # (seq_length, feat_dim) array | |
# mask = noise_mask( | |
# x, | |
# self.missing_ratio, | |
# self.mean_mask_length, | |
# self.style, | |
# self.distribution, | |
# ) # (seq_length, feat_dim) boolean array | |
# masks[idx, :, :] = mask | |
# if self.save2npy: | |
# np.save(os.path.join(self.dir, f"sine_masking_{self.window}.npy"), masks) | |
# # Restore RNG. | |
# np.random.set_state(st0) | |
# return masks.astype(bool) | |
# def __getitem__(self, ind): | |
# if self.period == "test": | |
# x = self.samples[ind, :, :] # (seq_length, feat_dim) array | |
# m = self.masking[ind, :, :] # (seq_length, feat_dim) boolean array | |
# return torch.from_numpy(x).float(), torch.from_numpy(m) | |
# x = self.samples[ind, :, :] # (seq_length, feat_dim) array | |
# return torch.from_numpy(x).float() | |
# def __len__(self): | |
# return self.sample_num | |