Huy0502's picture
Upload utils.py
5730ef0 verified
import os
import pickle
import torch
import numpy as np
import pandas as pd
import ta
from tqdm import tqdm
def load_model_config(model_dir: str):
with open(file=os.path.join(model_dir, 'config.pkl'), mode='rb') as f:
config = pickle.load(f)
return config
def load_model(model, model_dir: str, device: str = 'cuda'):
model.load_state_dict(state_dict=torch.load(f=os.path.join(model_dir, 'model.pth'), map_location=torch.device(device=device)))
return model
def normalize(data, min_val, max_val):
# data.shape = (bs, ts_size, z_dim)
data = data - min_val
data = data / (max_val + 1e-7)
return data
def renormalize(data, min_val, max_val):
# data.shape = (bs, seq_len, z_dim)
data *= max_val
data += min_val
return data
def train_test_split(data, ratio):
idx = np.random.permutation(len(data))
train_idx = idx[:int(ratio * len(data))]
test_idx = idx[int(ratio * len(data)):]
train_data = data[train_idx, ...]
test_data = data[test_idx, ...]
return train_data, test_data
def load_data(ts_size, data):
# data.shape = (rows, features)
def sliding_window(ts_size, ori_data):
# Flipping the data to make chronological data
ori_data = ori_data[::-1] # (len(csv), z_dim)
# Make (len(ori_data), z_dim) into (num_samples, seq_len, z_dim)
samples = []
for i in range(len(ori_data) - ts_size):
single_sample = ori_data[i:i + ts_size] # (seq_len, z_dim)
samples.append(single_sample)
samples = np.array(samples) # (bs, seq_len, z_dim)
np.random.shuffle(samples) # Make it more like i.i.d.
return samples
data = sliding_window(ts_size=ts_size, ori_data=data) # (bs, ts_size, z_dim)
return data
def calculate_technical_indicators(df_passed: pd.DataFrame, rolling_window = 50, handle_nan = True):
df = df_passed.copy()
def generate_indicators(df, rolling_window = 50):
# Calculate technical indicators
# df['momentum'] = ta.momentum.roc(df['Close'])
# df['trend'] = ta.trend.sma_indicator(df['Close'])
# df['volatility'] = ta.volatility.bollinger_mavg(df['Close'])
# df['volume'] = ta.volume.on_balance_volume(df['Close'], df['Volume'])
df['stoch'] = ta.momentum.stoch(df['High'], df['Low'], df['Close'])
df['adx'] = ta.trend.adx(df['High'], df['Low'], df['Close'])
df['bollinger_hband'] = ta.volatility.bollinger_hband(df['Close'])
df['mfi'] = ta.volume.money_flow_index(df['High'], df['Low'], df['Close'], df['Volume'])
df['rsi'] = ta.momentum.rsi(df['Close'])
df['ma'] = ta.trend.sma_indicator(df['Close'])
df['std'] = df['Close'].rolling(window=rolling_window).std()
df['adl'] = ta.volume.acc_dist_index(df['High'], df['Low'], df['Close'], df['Volume'])
df['williams'] = ta.momentum.williams_r(df['High'], df['Low'], df['Close'])
df['macd'] = ta.trend.macd(df['Close'])
df['obv'] = ta.volume.on_balance_volume(df['Close'], df['Volume'])
df['sar'] = ta.trend.psar_down(df['High'], df['Low'], df['Close']) # Added the 'close' argument
df['ichimoku_a'] = ta.trend.ichimoku_a(df['High'], df['Low'])
df['ichimoku_b'] = ta.trend.ichimoku_b(df['High'], df['Low'])
return df
df = generate_indicators(df=df, rolling_window=rolling_window)
if not handle_nan:
return df
# Fillna
df = df.fillna(method='ffill')
df = df.iloc[rolling_window + 1 : ]
df = df.fillna(method='bfill')
if df.isna().sum().sum() > 0:
raise Exception('NaN values found')
return df
def create_batches(all_symbols_df: pd.DataFrame,
trainer_config: dict,
model_config: dict):
train_batches = torch.tensor(data=[])
val_batches = torch.tensor(data=[])
symbols = all_symbols_df['Symbol'].unique()
for symbol in tqdm(symbols):
df = all_symbols_df[all_symbols_df['Symbol'] == symbol]
df = df.sort_values(by='Date')
if trainer_config['calculate_technical_indicators']:
df = calculate_technical_indicators(df, rolling_window=model_config['ts_size'])
df = df[model_config['stock_features']]
data = df.values
train_data, val_data = train_test_split(data=data, ratio=trainer_config['split_ratio'])
# Create batches (sliding window)
train_data = load_data(ts_size=model_config['ts_size'], data=train_data)
val_data = load_data(ts_size=model_config['ts_size'], data=val_data)
if len(train_data) > 0:
train_data = normalize(train_data, min_val=model_config['min_val'], max_val=model_config['max_val'])
train_data = torch.tensor(train_data)
train_batches = torch.cat(tensors=[train_batches, train_data])
if len(val_data) > 0:
val_data = normalize(val_data, min_val=model_config['min_val'], max_val=model_config['max_val'])
val_data = torch.tensor(val_data)
val_batches = torch.cat(tensors=[val_batches, val_data])
return train_batches, val_batches
def get_mini_batch(batch_size, data):
idx = np.random.permutation(len(data))
idx = idx[:batch_size]
data_mini = data[idx, ...] # (bs, seq_len, z_dim)
return data_mini
def generate_random_masks(num_samples, ts_size, mask_size, num_masks):
# xxxo
# oxxx
# xxox
num_patches = int(ts_size // mask_size)
def single_sample_mask():
idx = np.random.permutation(num_patches)[:num_masks]
mask = np.zeros(ts_size, dtype=bool)
for j in idx:
mask[j * mask_size:(j + 1) * mask_size] = 1
return mask
masks_list = [single_sample_mask() for _ in range(num_samples)]
masks_list = [torch.tensor(mask) for mask in masks_list]
masks = torch.stack(masks_list, axis=0) # (num_samples, ts_size)
return masks
def generate_pseudo_masks(ts_size, num_samples):
# xxxx
# xxxx
# xxxx
masks = np.zeros((num_samples, ts_size), dtype=bool)
return masks