Spaces:
Sleeping
Sleeping
import os | |
import torch | |
import numpy as np | |
import pandas as pd | |
from scipy import io | |
from sklearn.preprocessing import MinMaxScaler | |
from torch.utils.data import Dataset | |
from models.model_utils import normalize_to_neg_one_to_one, unnormalize_to_zero_to_one | |
from utils.masking_utils import noise_mask | |
class CustomDataset(Dataset): | |
def __init__( | |
self, | |
name, | |
data_root, | |
window=64, | |
proportion=0.8, | |
save2npy=True, | |
neg_one_to_one=True, | |
seed=123, | |
period="train", | |
output_dir="./OUTPUT", | |
predict_length=None, | |
missing_ratio=None, | |
style="separate", | |
distribution="geometric", | |
mean_mask_length=3, | |
): | |
super(CustomDataset, self).__init__() | |
assert period in ["train", "test"], "period must be train or test." | |
if period == "train": | |
assert not (predict_length is not None or missing_ratio is not None), "" | |
self.period = period | |
self.name = name | |
self.pred_len = predict_length | |
self.missing_ratio = missing_ratio | |
self.style = style | |
self.distribution = distribution | |
self.mean_mask_length = mean_mask_length | |
self.rawdata, self.scaler = self.read_data(data_root, self.name) | |
self.dir = os.path.join(output_dir, "samples") | |
os.makedirs(self.dir, exist_ok=True) | |
self.window, self.period = window, period | |
self.len, self.var_num = self.rawdata.shape[0], self.rawdata.shape[-1] | |
self.sample_num_total = max(self.len - self.window + 1, 0) | |
self.save2npy = save2npy | |
self.auto_norm = neg_one_to_one | |
self.data = self.__normalize(self.rawdata) | |
train, inference = self.getsamples(self.data, proportion, seed) | |
self.samples = train if period == "train" else inference | |
if period == "test": | |
if missing_ratio is not None: | |
self.masking = self.mask_data(seed) | |
elif predict_length is not None: | |
masks = np.ones(self.samples.shape) | |
masks[:, -predict_length:, :] = 0 | |
self.masking = masks.astype(bool) | |
else: | |
raise NotImplementedError() | |
self.sample_num = self.samples.shape[0] | |
print(f"Dataset load from {data_root} with shape {self.samples.shape}") | |
def getsamples(self, data, proportion, seed): | |
x = np.zeros((self.sample_num_total, self.window, self.var_num)) | |
for i in range(self.sample_num_total): | |
start = i | |
end = i + self.window | |
x[i, :, :] = data[start:end, :] | |
train_data, test_data = self.divide(x, proportion, seed) | |
if self.save2npy: | |
if 1 - proportion > 0: | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_ground_truth_{self.window}_test.npy" | |
), | |
self.unnormalize(test_data), | |
) | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_ground_truth_{self.window}_train.npy" | |
), | |
self.unnormalize(train_data), | |
) | |
if self.auto_norm: | |
if 1 - proportion > 0: | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy" | |
), | |
unnormalize_to_zero_to_one(test_data), | |
) | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy" | |
), | |
unnormalize_to_zero_to_one(train_data), | |
) | |
else: | |
if 1 - proportion > 0: | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy" | |
), | |
test_data, | |
) | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy" | |
), | |
train_data, | |
) | |
return train_data, test_data | |
def normalize(self, sq): | |
d = sq.reshape(-1, self.var_num) | |
d = self.scaler.transform(d) | |
if self.auto_norm: | |
d = normalize_to_neg_one_to_one(d) | |
return d.reshape(-1, self.window, self.var_num) | |
def unnormalize(self, sq): | |
d = self.__unnormalize(sq.reshape(-1, self.var_num)) | |
return d.reshape(-1, self.window, self.var_num) | |
def __normalize(self, rawdata): | |
data = self.scaler.transform(rawdata) | |
if self.auto_norm: | |
data = normalize_to_neg_one_to_one(data) | |
return data | |
def __unnormalize(self, data): | |
if self.auto_norm: | |
data = unnormalize_to_zero_to_one(data) | |
x = data | |
return self.scaler.inverse_transform(x) | |
def divide(data, ratio, seed=2023): | |
size = data.shape[0] | |
# Store the state of the RNG to restore later. | |
st0 = np.random.get_state() | |
np.random.seed(seed) | |
regular_train_num = int(np.ceil(size * ratio)) | |
id_rdm = np.random.permutation(size) | |
# id_rdm = np.arange(size) | |
regular_train_id = id_rdm[:regular_train_num] | |
irregular_train_id = id_rdm[regular_train_num:] | |
regular_data = data[regular_train_id, :] | |
irregular_data = data[irregular_train_id, :] | |
# Restore RNG. | |
np.random.set_state(st0) | |
return regular_data, irregular_data | |
def read_data(filepath, name=""): | |
"""Reads a single .csv""" | |
df = pd.read_csv(filepath, header=0) | |
if name == "etth": | |
df.drop(df.columns[0], axis=1, inplace=True) | |
data = df.values | |
scaler = MinMaxScaler() | |
scaler = scaler.fit(data) | |
return data, scaler | |
def mask_data(self, seed=2023): | |
masks = np.ones_like(self.samples) | |
# Store the state of the RNG to restore later. | |
st0 = np.random.get_state() | |
np.random.seed(seed) | |
for idx in range(self.samples.shape[0]): | |
x = self.samples[idx, :, :] # (seq_length, feat_dim) array | |
mask = noise_mask( | |
x, | |
self.missing_ratio, | |
self.mean_mask_length, | |
self.style, | |
self.distribution, | |
) # (seq_length, feat_dim) boolean array | |
masks[idx, :, :] = mask | |
if self.save2npy: | |
np.save( | |
os.path.join(self.dir, f"{self.name}_masking_{self.window}.npy"), masks | |
) | |
# Restore RNG. | |
np.random.set_state(st0) | |
return masks.astype(bool) | |
def __getitem__(self, ind): | |
if self.period == "test": | |
x = self.samples[ind, :, :] # (seq_length, feat_dim) array | |
m = self.masking[ind, :, :] # (seq_length, feat_dim) boolean array | |
return torch.from_numpy(x).float(), torch.from_numpy(m) | |
x = self.samples[ind, :, :] # (seq_length, feat_dim) array | |
return torch.from_numpy(x).float() | |
def __len__(self): | |
return self.sample_num | |
class RevenueDataset(CustomDataset): | |
def __init__( | |
self, | |
name, | |
data_root, | |
window=64, | |
proportion=0.8, | |
save2npy=True, | |
neg_one_to_one=True, | |
seed=123, | |
period="train", | |
output_dir="./OUTPUT", | |
predict_length=None, | |
missing_ratio=None, | |
style="separate", | |
distribution="geometric", | |
mean_mask_length=3, | |
): | |
super(CustomDataset, self).__init__() | |
assert period in ["train", "test"], "period must be train or test." | |
if period == "train": | |
assert not (predict_length is not None or missing_ratio is not None), "" | |
self.period = period | |
self.name = name | |
self.pred_len = predict_length | |
self.missing_ratio = missing_ratio | |
self.style = style | |
self.distribution = distribution | |
self.mean_mask_length = mean_mask_length | |
self.dir = os.path.join(output_dir, "samples") | |
os.makedirs(self.dir, exist_ok=True) | |
self.window, self.period = window, period | |
# self.len, self.var_num = self.rawdata.shape[0], self.rawdata.shape[-1] | |
self.rawdata, self.scaler = self.read_data(data_root, self.name) | |
self.len = len(self.rawdata) // self.window | |
self.var_num = 3 | |
self.sample_num_total = self.len | |
# self.sample_num_total = max(self.len - self.window + 1, 0) | |
self.save2npy = save2npy | |
self.auto_norm = neg_one_to_one | |
self.data = self.__normalize(self.rawdata) | |
train, inference = self.getsamples(self.data, proportion, seed) | |
self.samples = train if period == "train" else inference | |
if period == "test": | |
if missing_ratio is not None: | |
self.masking = self.mask_data(seed) | |
elif predict_length is not None: | |
masks = np.ones(self.samples.shape) | |
masks[:, -predict_length:, :] = 0 | |
self.masking = masks.astype(bool) | |
else: | |
raise NotImplementedError() | |
self.sample_num = self.samples.shape[0] | |
print(f"Dataset load from {data_root} with shape {self.samples.shape}") | |
# @staticmethod | |
def read_data(self, filepath, name=""): | |
"""Reads a single .csv""" | |
df = pd.read_csv(filepath) | |
min_max_scale = lambda series: (series - series.min()) / ( | |
series.max() - series.min() | |
) | |
mean_std_scale = lambda series: (series - series.mean()) / series.std() | |
moving_average = lambda series: series.rolling(window=7, min_periods=1).mean() | |
for variable in ["revenue", "download", "au"]: | |
df[variable] = df.groupby("app_id")[variable].transform(min_max_scale) | |
# data = df.groupby("app_id").first(min_count=self.window).values | |
# get the first window days of each app after sorting by date | |
data = ( | |
df.groupby("app_id").head(self.window)[["download", "revenue", "au"]].values | |
) | |
# print(data.shape, self.window) | |
# print(self.window * len(df["app_id"].unique())) | |
scaler = MinMaxScaler() | |
scaler = scaler.fit(data) | |
return data, scaler | |
def __normalize(self, rawdata): | |
data = self.scaler.transform(rawdata) | |
if self.auto_norm: | |
data = normalize_to_neg_one_to_one(data) | |
return data | |
def __unnormalize(self, data): | |
if self.auto_norm: | |
data = unnormalize_to_zero_to_one(data) | |
x = data | |
return self.scaler.inverse_transform(x) | |
class ControlRevenueDataset(RevenueDataset): | |
def getsamples(self, data, proportion, seed): | |
x = np.zeros((self.sample_num_total, self.window, self.var_num)) | |
for i in range(self.sample_num_total): | |
start = i | |
end = i + self.window | |
x[i, :, :] = data[start:end, :] | |
train_data, test_data = self.divide(x, proportion, seed) | |
# print("Origin split, train ", train_data.shape, "; test", test_data.shape) | |
# print(train_data.max(), train_data.min()) | |
import random | |
# data agumentation | |
# plt five times aug data[0] with 3 channel | |
# import matplotlib.pyplot as plt | |
# sub = plt.subplot(111) | |
# sub.plot(train_data[0, :, 0]) | |
# sub.plot(train_data[0, :, 1]) | |
# sub.plot(train_data[0, :, 2]) | |
# plt.show() | |
aug_data = [] | |
# for delta in np.linspace(-.3, 0.5, 4): | |
for delta in np.linspace(-0.3, 0.3, 4): | |
print(delta) | |
random.seed(2023) | |
tmp = train_data.copy() | |
tmp[:, :, 0] += np.random.normal(delta, 2, tmp.shape[1]) / 10 | |
tmp[:, :, 1] += np.random.normal(delta, 0.15, tmp.shape[1]) / 10 | |
tmp[:, :, 2] += np.random.normal(delta, 0.1, tmp.shape[1]) / 10 | |
for c in range(3): | |
# min max resacele | |
tmp[:, :, c] = ( | |
(tmp[:, :, c] - tmp[:, :, c].min()) | |
/ (tmp[:, :, c].max() - tmp[:, :, c].min()) | |
- 0.5 | |
) * 2 | |
aug_data.append(tmp) | |
# sub = plt.subplot(111) | |
# sub.plot(tmp[0, :, 0]) | |
# sub.plot(tmp[0, :, 1]) | |
# sub.plot(tmp[0, :, 2]) | |
# plt.show() | |
train_data = np.concatenate([train_data] + aug_data, axis=0).clip(-1, 1) | |
if self.save2npy: | |
if 1 - proportion > 0: | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_ground_truth_{self.window}_test.npy" | |
), | |
self.unnormalize(test_data), | |
) | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_ground_truth_{self.window}_train.npy" | |
), | |
self.unnormalize(train_data), | |
) | |
if self.auto_norm: | |
if 1 - proportion > 0: | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy" | |
), | |
unnormalize_to_zero_to_one(test_data), | |
) | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy" | |
), | |
unnormalize_to_zero_to_one(train_data), | |
) | |
else: | |
if 1 - proportion > 0: | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy" | |
), | |
test_data, | |
) | |
np.save( | |
os.path.join( | |
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy" | |
), | |
train_data, | |
) | |
# print("Split, train ", train_data.shape, "; test", test_data.shape) | |
return train_data, test_data | |
def __normalize(self, rawdata): | |
data = self.scaler.transform(rawdata) | |
if self.auto_norm: | |
data = normalize_to_neg_one_to_one(data) | |
return data | |
def __unnormalize(self, data): | |
if self.auto_norm: | |
data = unnormalize_to_zero_to_one(data) | |
x = data | |
return self.scaler.inverse_transform(x) | |
class fMRIDataset(CustomDataset): | |
def __init__(self, proportion=1.0, **kwargs): | |
super().__init__(proportion=proportion, **kwargs) | |
def read_data(filepath, name=""): | |
"""Reads a single .csv""" | |
data = io.loadmat(filepath + "/sim4.mat")["ts"] | |
scaler = MinMaxScaler() | |
scaler = scaler.fit(data) | |
return data, scaler | |