Spaces:
Sleeping
Sleeping
import os | |
import pickle | |
import torch | |
import numpy as np | |
import pandas as pd | |
import ta | |
from tqdm import tqdm | |
def load_model_config(model_dir: str): | |
with open(file=os.path.join(model_dir, 'config.pkl'), mode='rb') as f: | |
config = pickle.load(f) | |
return config | |
def load_model(model, model_dir: str, device: str = 'cuda'): | |
model.load_state_dict(state_dict=torch.load(f=os.path.join(model_dir, 'model.pth'), map_location=torch.device(device=device))) | |
return model | |
def normalize(data, min_val, max_val): | |
# data.shape = (bs, ts_size, z_dim) | |
data = data - min_val | |
data = data / (max_val + 1e-7) | |
return data | |
def renormalize(data, min_val, max_val): | |
# data.shape = (bs, seq_len, z_dim) | |
data *= max_val | |
data += min_val | |
return data | |
def train_test_split(data, ratio): | |
idx = np.random.permutation(len(data)) | |
train_idx = idx[:int(ratio * len(data))] | |
test_idx = idx[int(ratio * len(data)):] | |
train_data = data[train_idx, ...] | |
test_data = data[test_idx, ...] | |
return train_data, test_data | |
def load_data(ts_size, data): | |
# data.shape = (rows, features) | |
def sliding_window(ts_size, ori_data): | |
# Flipping the data to make chronological data | |
ori_data = ori_data[::-1] # (len(csv), z_dim) | |
# Make (len(ori_data), z_dim) into (num_samples, seq_len, z_dim) | |
samples = [] | |
for i in range(len(ori_data) - ts_size): | |
single_sample = ori_data[i:i + ts_size] # (seq_len, z_dim) | |
samples.append(single_sample) | |
samples = np.array(samples) # (bs, seq_len, z_dim) | |
np.random.shuffle(samples) # Make it more like i.i.d. | |
return samples | |
data = sliding_window(ts_size=ts_size, ori_data=data) # (bs, ts_size, z_dim) | |
return data | |
def calculate_technical_indicators(df_passed: pd.DataFrame, rolling_window = 50, handle_nan = True): | |
df = df_passed.copy() | |
def generate_indicators(df, rolling_window = 50): | |
# Calculate technical indicators | |
# df['momentum'] = ta.momentum.roc(df['Close']) | |
# df['trend'] = ta.trend.sma_indicator(df['Close']) | |
# df['volatility'] = ta.volatility.bollinger_mavg(df['Close']) | |
# df['volume'] = ta.volume.on_balance_volume(df['Close'], df['Volume']) | |
df['stoch'] = ta.momentum.stoch(df['High'], df['Low'], df['Close']) | |
df['adx'] = ta.trend.adx(df['High'], df['Low'], df['Close']) | |
df['bollinger_hband'] = ta.volatility.bollinger_hband(df['Close']) | |
df['mfi'] = ta.volume.money_flow_index(df['High'], df['Low'], df['Close'], df['Volume']) | |
df['rsi'] = ta.momentum.rsi(df['Close']) | |
df['ma'] = ta.trend.sma_indicator(df['Close']) | |
df['std'] = df['Close'].rolling(window=rolling_window).std() | |
df['adl'] = ta.volume.acc_dist_index(df['High'], df['Low'], df['Close'], df['Volume']) | |
df['williams'] = ta.momentum.williams_r(df['High'], df['Low'], df['Close']) | |
df['macd'] = ta.trend.macd(df['Close']) | |
df['obv'] = ta.volume.on_balance_volume(df['Close'], df['Volume']) | |
df['sar'] = ta.trend.psar_down(df['High'], df['Low'], df['Close']) # Added the 'close' argument | |
df['ichimoku_a'] = ta.trend.ichimoku_a(df['High'], df['Low']) | |
df['ichimoku_b'] = ta.trend.ichimoku_b(df['High'], df['Low']) | |
return df | |
df = generate_indicators(df=df, rolling_window=rolling_window) | |
if not handle_nan: | |
return df | |
# Fillna | |
df = df.fillna(method='ffill') | |
df = df.iloc[rolling_window + 1 : ] | |
df = df.fillna(method='bfill') | |
if df.isna().sum().sum() > 0: | |
raise Exception('NaN values found') | |
return df | |
def create_batches(all_symbols_df: pd.DataFrame, | |
trainer_config: dict, | |
model_config: dict): | |
train_batches = torch.tensor(data=[]) | |
val_batches = torch.tensor(data=[]) | |
symbols = all_symbols_df['Symbol'].unique() | |
for symbol in tqdm(symbols): | |
df = all_symbols_df[all_symbols_df['Symbol'] == symbol] | |
df = df.sort_values(by='Date') | |
if trainer_config['calculate_technical_indicators']: | |
df = calculate_technical_indicators(df, rolling_window=model_config['ts_size']) | |
df = df[model_config['stock_features']] | |
data = df.values | |
train_data, val_data = train_test_split(data=data, ratio=trainer_config['split_ratio']) | |
# Create batches (sliding window) | |
train_data = load_data(ts_size=model_config['ts_size'], data=train_data) | |
val_data = load_data(ts_size=model_config['ts_size'], data=val_data) | |
if len(train_data) > 0: | |
train_data = normalize(train_data, min_val=model_config['min_val'], max_val=model_config['max_val']) | |
train_data = torch.tensor(train_data) | |
train_batches = torch.cat(tensors=[train_batches, train_data]) | |
if len(val_data) > 0: | |
val_data = normalize(val_data, min_val=model_config['min_val'], max_val=model_config['max_val']) | |
val_data = torch.tensor(val_data) | |
val_batches = torch.cat(tensors=[val_batches, val_data]) | |
return train_batches, val_batches | |
def get_mini_batch(batch_size, data): | |
idx = np.random.permutation(len(data)) | |
idx = idx[:batch_size] | |
data_mini = data[idx, ...] # (bs, seq_len, z_dim) | |
return data_mini | |
def generate_random_masks(num_samples, ts_size, mask_size, num_masks): | |
# xxxo | |
# oxxx | |
# xxox | |
num_patches = int(ts_size // mask_size) | |
def single_sample_mask(): | |
idx = np.random.permutation(num_patches)[:num_masks] | |
mask = np.zeros(ts_size, dtype=bool) | |
for j in idx: | |
mask[j * mask_size:(j + 1) * mask_size] = 1 | |
return mask | |
masks_list = [single_sample_mask() for _ in range(num_samples)] | |
masks_list = [torch.tensor(mask) for mask in masks_list] | |
masks = torch.stack(masks_list, axis=0) # (num_samples, ts_size) | |
return masks | |
def generate_pseudo_masks(ts_size, num_samples): | |
# xxxx | |
# xxxx | |
# xxxx | |
masks = np.zeros((num_samples, ts_size), dtype=bool) | |
return masks |