TSEditor / utils /data_utils /real_datasets.py
PeterYu's picture
update
2875fe6
raw
history blame
15.7 kB
import os
import torch
import numpy as np
import pandas as pd
from scipy import io
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset
from models.model_utils import normalize_to_neg_one_to_one, unnormalize_to_zero_to_one
from utils.masking_utils import noise_mask
class CustomDataset(Dataset):
def __init__(
self,
name,
data_root,
window=64,
proportion=0.8,
save2npy=True,
neg_one_to_one=True,
seed=123,
period="train",
output_dir="./OUTPUT",
predict_length=None,
missing_ratio=None,
style="separate",
distribution="geometric",
mean_mask_length=3,
):
super(CustomDataset, self).__init__()
assert period in ["train", "test"], "period must be train or test."
if period == "train":
assert not (predict_length is not None or missing_ratio is not None), ""
self.period = period
self.name = name
self.pred_len = predict_length
self.missing_ratio = missing_ratio
self.style = style
self.distribution = distribution
self.mean_mask_length = mean_mask_length
self.rawdata, self.scaler = self.read_data(data_root, self.name)
self.dir = os.path.join(output_dir, "samples")
os.makedirs(self.dir, exist_ok=True)
self.window, self.period = window, period
self.len, self.var_num = self.rawdata.shape[0], self.rawdata.shape[-1]
self.sample_num_total = max(self.len - self.window + 1, 0)
self.save2npy = save2npy
self.auto_norm = neg_one_to_one
self.data = self.__normalize(self.rawdata)
train, inference = self.getsamples(self.data, proportion, seed)
self.samples = train if period == "train" else inference
if period == "test":
if missing_ratio is not None:
self.masking = self.mask_data(seed)
elif predict_length is not None:
masks = np.ones(self.samples.shape)
masks[:, -predict_length:, :] = 0
self.masking = masks.astype(bool)
else:
raise NotImplementedError()
self.sample_num = self.samples.shape[0]
print(f"Dataset load from {data_root} with shape {self.samples.shape}")
def getsamples(self, data, proportion, seed):
x = np.zeros((self.sample_num_total, self.window, self.var_num))
for i in range(self.sample_num_total):
start = i
end = i + self.window
x[i, :, :] = data[start:end, :]
train_data, test_data = self.divide(x, proportion, seed)
if self.save2npy:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_ground_truth_{self.window}_test.npy"
),
self.unnormalize(test_data),
)
np.save(
os.path.join(
self.dir, f"{self.name}_ground_truth_{self.window}_train.npy"
),
self.unnormalize(train_data),
)
if self.auto_norm:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy"
),
unnormalize_to_zero_to_one(test_data),
)
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy"
),
unnormalize_to_zero_to_one(train_data),
)
else:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy"
),
test_data,
)
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy"
),
train_data,
)
return train_data, test_data
def normalize(self, sq):
d = sq.reshape(-1, self.var_num)
d = self.scaler.transform(d)
if self.auto_norm:
d = normalize_to_neg_one_to_one(d)
return d.reshape(-1, self.window, self.var_num)
def unnormalize(self, sq):
d = self.__unnormalize(sq.reshape(-1, self.var_num))
return d.reshape(-1, self.window, self.var_num)
def __normalize(self, rawdata):
data = self.scaler.transform(rawdata)
if self.auto_norm:
data = normalize_to_neg_one_to_one(data)
return data
def __unnormalize(self, data):
if self.auto_norm:
data = unnormalize_to_zero_to_one(data)
x = data
return self.scaler.inverse_transform(x)
@staticmethod
def divide(data, ratio, seed=2023):
size = data.shape[0]
# Store the state of the RNG to restore later.
st0 = np.random.get_state()
np.random.seed(seed)
regular_train_num = int(np.ceil(size * ratio))
id_rdm = np.random.permutation(size)
# id_rdm = np.arange(size)
regular_train_id = id_rdm[:regular_train_num]
irregular_train_id = id_rdm[regular_train_num:]
regular_data = data[regular_train_id, :]
irregular_data = data[irregular_train_id, :]
# Restore RNG.
np.random.set_state(st0)
return regular_data, irregular_data
@staticmethod
def read_data(filepath, name=""):
"""Reads a single .csv"""
df = pd.read_csv(filepath, header=0)
if name == "etth":
df.drop(df.columns[0], axis=1, inplace=True)
data = df.values
scaler = MinMaxScaler()
scaler = scaler.fit(data)
return data, scaler
def mask_data(self, seed=2023):
masks = np.ones_like(self.samples)
# Store the state of the RNG to restore later.
st0 = np.random.get_state()
np.random.seed(seed)
for idx in range(self.samples.shape[0]):
x = self.samples[idx, :, :] # (seq_length, feat_dim) array
mask = noise_mask(
x,
self.missing_ratio,
self.mean_mask_length,
self.style,
self.distribution,
) # (seq_length, feat_dim) boolean array
masks[idx, :, :] = mask
if self.save2npy:
np.save(
os.path.join(self.dir, f"{self.name}_masking_{self.window}.npy"), masks
)
# Restore RNG.
np.random.set_state(st0)
return masks.astype(bool)
def __getitem__(self, ind):
if self.period == "test":
x = self.samples[ind, :, :] # (seq_length, feat_dim) array
m = self.masking[ind, :, :] # (seq_length, feat_dim) boolean array
return torch.from_numpy(x).float(), torch.from_numpy(m)
x = self.samples[ind, :, :] # (seq_length, feat_dim) array
return torch.from_numpy(x).float()
def __len__(self):
return self.sample_num
class RevenueDataset(CustomDataset):
def __init__(
self,
name,
data_root,
window=64,
proportion=0.8,
save2npy=True,
neg_one_to_one=True,
seed=123,
period="train",
output_dir="./OUTPUT",
predict_length=None,
missing_ratio=None,
style="separate",
distribution="geometric",
mean_mask_length=3,
):
super(CustomDataset, self).__init__()
assert period in ["train", "test"], "period must be train or test."
if period == "train":
assert not (predict_length is not None or missing_ratio is not None), ""
self.period = period
self.name = name
self.pred_len = predict_length
self.missing_ratio = missing_ratio
self.style = style
self.distribution = distribution
self.mean_mask_length = mean_mask_length
self.dir = os.path.join(output_dir, "samples")
os.makedirs(self.dir, exist_ok=True)
self.window, self.period = window, period
# self.len, self.var_num = self.rawdata.shape[0], self.rawdata.shape[-1]
self.rawdata, self.scaler = self.read_data(data_root, self.name)
self.len = len(self.rawdata) // self.window
self.var_num = 3
self.sample_num_total = self.len
# self.sample_num_total = max(self.len - self.window + 1, 0)
self.save2npy = save2npy
self.auto_norm = neg_one_to_one
self.data = self.__normalize(self.rawdata)
train, inference = self.getsamples(self.data, proportion, seed)
self.samples = train if period == "train" else inference
if period == "test":
if missing_ratio is not None:
self.masking = self.mask_data(seed)
elif predict_length is not None:
masks = np.ones(self.samples.shape)
masks[:, -predict_length:, :] = 0
self.masking = masks.astype(bool)
else:
raise NotImplementedError()
self.sample_num = self.samples.shape[0]
print(f"Dataset load from {data_root} with shape {self.samples.shape}")
# @staticmethod
def read_data(self, filepath, name=""):
"""Reads a single .csv"""
df = pd.read_csv(filepath)
min_max_scale = lambda series: (series - series.min()) / (
series.max() - series.min()
)
mean_std_scale = lambda series: (series - series.mean()) / series.std()
moving_average = lambda series: series.rolling(window=7, min_periods=1).mean()
for variable in ["revenue", "download", "au"]:
df[variable] = df.groupby("app_id")[variable].transform(min_max_scale)
# data = df.groupby("app_id").first(min_count=self.window).values
# get the first window days of each app after sorting by date
data = (
df.groupby("app_id").head(self.window)[["download", "revenue", "au"]].values
)
# print(data.shape, self.window)
# print(self.window * len(df["app_id"].unique()))
scaler = MinMaxScaler()
scaler = scaler.fit(data)
return data, scaler
def __normalize(self, rawdata):
data = self.scaler.transform(rawdata)
if self.auto_norm:
data = normalize_to_neg_one_to_one(data)
return data
def __unnormalize(self, data):
if self.auto_norm:
data = unnormalize_to_zero_to_one(data)
x = data
return self.scaler.inverse_transform(x)
class ControlRevenueDataset(RevenueDataset):
def getsamples(self, data, proportion, seed):
x = np.zeros((self.sample_num_total, self.window, self.var_num))
for i in range(self.sample_num_total):
start = i
end = i + self.window
x[i, :, :] = data[start:end, :]
train_data, test_data = self.divide(x, proportion, seed)
# print("Origin split, train ", train_data.shape, "; test", test_data.shape)
# print(train_data.max(), train_data.min())
import random
# data agumentation
# plt five times aug data[0] with 3 channel
# import matplotlib.pyplot as plt
# sub = plt.subplot(111)
# sub.plot(train_data[0, :, 0])
# sub.plot(train_data[0, :, 1])
# sub.plot(train_data[0, :, 2])
# plt.show()
aug_data = []
# for delta in np.linspace(-.3, 0.5, 4):
for delta in np.linspace(-0.3, 0.3, 4):
print(delta)
random.seed(2023)
tmp = train_data.copy()
tmp[:, :, 0] += np.random.normal(delta, 2, tmp.shape[1]) / 10
tmp[:, :, 1] += np.random.normal(delta, 0.15, tmp.shape[1]) / 10
tmp[:, :, 2] += np.random.normal(delta, 0.1, tmp.shape[1]) / 10
for c in range(3):
# min max resacele
tmp[:, :, c] = (
(tmp[:, :, c] - tmp[:, :, c].min())
/ (tmp[:, :, c].max() - tmp[:, :, c].min())
- 0.5
) * 2
aug_data.append(tmp)
# sub = plt.subplot(111)
# sub.plot(tmp[0, :, 0])
# sub.plot(tmp[0, :, 1])
# sub.plot(tmp[0, :, 2])
# plt.show()
train_data = np.concatenate([train_data] + aug_data, axis=0).clip(-1, 1)
if self.save2npy:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_ground_truth_{self.window}_test.npy"
),
self.unnormalize(test_data),
)
np.save(
os.path.join(
self.dir, f"{self.name}_ground_truth_{self.window}_train.npy"
),
self.unnormalize(train_data),
)
if self.auto_norm:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy"
),
unnormalize_to_zero_to_one(test_data),
)
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy"
),
unnormalize_to_zero_to_one(train_data),
)
else:
if 1 - proportion > 0:
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_test.npy"
),
test_data,
)
np.save(
os.path.join(
self.dir, f"{self.name}_norm_truth_{self.window}_train.npy"
),
train_data,
)
# print("Split, train ", train_data.shape, "; test", test_data.shape)
return train_data, test_data
def __normalize(self, rawdata):
data = self.scaler.transform(rawdata)
if self.auto_norm:
data = normalize_to_neg_one_to_one(data)
return data
def __unnormalize(self, data):
if self.auto_norm:
data = unnormalize_to_zero_to_one(data)
x = data
return self.scaler.inverse_transform(x)
class fMRIDataset(CustomDataset):
def __init__(self, proportion=1.0, **kwargs):
super().__init__(proportion=proportion, **kwargs)
@staticmethod
def read_data(filepath, name=""):
"""Reads a single .csv"""
data = io.loadmat(filepath + "/sim4.mat")["ts"]
scaler = MinMaxScaler()
scaler = scaler.fit(data)
return data, scaler