import os import pickle import torch import numpy as np import pandas as pd import ta from tqdm import tqdm def load_model_config(model_dir: str): with open(file=os.path.join(model_dir, 'config.pkl'), mode='rb') as f: config = pickle.load(f) return config def load_model(model, model_dir: str, device: str = 'cuda'): model.load_state_dict(state_dict=torch.load(f=os.path.join(model_dir, 'model.pth'), map_location=torch.device(device=device))) return model def normalize(data, min_val, max_val): # data.shape = (bs, ts_size, z_dim) data = data - min_val data = data / (max_val + 1e-7) return data def renormalize(data, min_val, max_val): # data.shape = (bs, seq_len, z_dim) data *= max_val data += min_val return data def train_test_split(data, ratio): idx = np.random.permutation(len(data)) train_idx = idx[:int(ratio * len(data))] test_idx = idx[int(ratio * len(data)):] train_data = data[train_idx, ...] test_data = data[test_idx, ...] return train_data, test_data def load_data(ts_size, data): # data.shape = (rows, features) def sliding_window(ts_size, ori_data): # Flipping the data to make chronological data ori_data = ori_data[::-1] # (len(csv), z_dim) # Make (len(ori_data), z_dim) into (num_samples, seq_len, z_dim) samples = [] for i in range(len(ori_data) - ts_size): single_sample = ori_data[i:i + ts_size] # (seq_len, z_dim) samples.append(single_sample) samples = np.array(samples) # (bs, seq_len, z_dim) np.random.shuffle(samples) # Make it more like i.i.d. return samples data = sliding_window(ts_size=ts_size, ori_data=data) # (bs, ts_size, z_dim) return data def calculate_technical_indicators(df_passed: pd.DataFrame, rolling_window = 50, handle_nan = True): df = df_passed.copy() def generate_indicators(df, rolling_window = 50): # Calculate technical indicators # df['momentum'] = ta.momentum.roc(df['Close']) # df['trend'] = ta.trend.sma_indicator(df['Close']) # df['volatility'] = ta.volatility.bollinger_mavg(df['Close']) # df['volume'] = ta.volume.on_balance_volume(df['Close'], df['Volume']) df['stoch'] = ta.momentum.stoch(df['High'], df['Low'], df['Close']) df['adx'] = ta.trend.adx(df['High'], df['Low'], df['Close']) df['bollinger_hband'] = ta.volatility.bollinger_hband(df['Close']) df['mfi'] = ta.volume.money_flow_index(df['High'], df['Low'], df['Close'], df['Volume']) df['rsi'] = ta.momentum.rsi(df['Close']) df['ma'] = ta.trend.sma_indicator(df['Close']) df['std'] = df['Close'].rolling(window=rolling_window).std() df['adl'] = ta.volume.acc_dist_index(df['High'], df['Low'], df['Close'], df['Volume']) df['williams'] = ta.momentum.williams_r(df['High'], df['Low'], df['Close']) df['macd'] = ta.trend.macd(df['Close']) df['obv'] = ta.volume.on_balance_volume(df['Close'], df['Volume']) df['sar'] = ta.trend.psar_down(df['High'], df['Low'], df['Close']) # Added the 'close' argument df['ichimoku_a'] = ta.trend.ichimoku_a(df['High'], df['Low']) df['ichimoku_b'] = ta.trend.ichimoku_b(df['High'], df['Low']) return df df = generate_indicators(df=df, rolling_window=rolling_window) if not handle_nan: return df # Fillna df = df.fillna(method='ffill') df = df.iloc[rolling_window + 1 : ] df = df.fillna(method='bfill') if df.isna().sum().sum() > 0: raise Exception('NaN values found') return df def create_batches(all_symbols_df: pd.DataFrame, trainer_config: dict, model_config: dict): train_batches = torch.tensor(data=[]) val_batches = torch.tensor(data=[]) symbols = all_symbols_df['Symbol'].unique() for symbol in tqdm(symbols): df = all_symbols_df[all_symbols_df['Symbol'] == symbol] df = df.sort_values(by='Date') if trainer_config['calculate_technical_indicators']: df = calculate_technical_indicators(df, rolling_window=model_config['ts_size']) df = df[model_config['stock_features']] data = df.values train_data, val_data = train_test_split(data=data, ratio=trainer_config['split_ratio']) # Create batches (sliding window) train_data = load_data(ts_size=model_config['ts_size'], data=train_data) val_data = load_data(ts_size=model_config['ts_size'], data=val_data) if len(train_data) > 0: train_data = normalize(train_data, min_val=model_config['min_val'], max_val=model_config['max_val']) train_data = torch.tensor(train_data) train_batches = torch.cat(tensors=[train_batches, train_data]) if len(val_data) > 0: val_data = normalize(val_data, min_val=model_config['min_val'], max_val=model_config['max_val']) val_data = torch.tensor(val_data) val_batches = torch.cat(tensors=[val_batches, val_data]) return train_batches, val_batches def get_mini_batch(batch_size, data): idx = np.random.permutation(len(data)) idx = idx[:batch_size] data_mini = data[idx, ...] # (bs, seq_len, z_dim) return data_mini def generate_random_masks(num_samples, ts_size, mask_size, num_masks): # xxxo # oxxx # xxox num_patches = int(ts_size // mask_size) def single_sample_mask(): idx = np.random.permutation(num_patches)[:num_masks] mask = np.zeros(ts_size, dtype=bool) for j in idx: mask[j * mask_size:(j + 1) * mask_size] = 1 return mask masks_list = [single_sample_mask() for _ in range(num_samples)] masks_list = [torch.tensor(mask) for mask in masks_list] masks = torch.stack(masks_list, axis=0) # (num_samples, ts_size) return masks def generate_pseudo_masks(ts_size, num_samples): # xxxx # xxxx # xxxx masks = np.zeros((num_samples, ts_size), dtype=bool) return masks