"""Reimplement TimeGAN-pytorch Codebase. Reference: Jinsung Yoon, Daniel Jarrett, Mihaela van der Schaar, "Time-series Generative Adversarial Networks," Neural Information Processing Systems (NeurIPS), 2019. Paper link: https://papers.nips.cc/paper/8789-time-series-generative-adversarial-networks Last updated Date: October 18th 2021 Code author: Zhiwei Zhang (bitzzw@gmail.com) ----------------------------- predictive_metrics.py Note: Use post-hoc RNN to classify original data and synthetic data Output: discriminative score (np.abs(classification accuracy - 0.5)) """ # Necessary Packages import tensorflow as tf import tensorflow._api.v2.compat.v1 as tf1 import numpy as np from sklearn.metrics import accuracy_score from utils.metric_utils import train_test_divide, extract_time def batch_generator(data, time, batch_size): """Mini-batch generator. Args: - data: time-series data - time: time information - batch_size: the number of samples in each batch Returns: - X_mb: time-series data in each batch - T_mb: time information in each batch """ no = len(data) idx = np.random.permutation(no) train_idx = idx[:batch_size] X_mb = list(data[i] for i in train_idx) T_mb = list(time[i] for i in train_idx) return X_mb, T_mb def discriminative_score_metrics(ori_data, generated_data): """Use post-hoc RNN to classify original data and synthetic data Args: - ori_data: original data - generated_data: generated synthetic data Returns: - discriminative_score: np.abs(classification accuracy - 0.5) """ # Initialization on the Graph tf1.reset_default_graph() # Basic Parameters no, seq_len, dim = np.asarray(ori_data).shape # Set maximum sequence length and each sequence length ori_time, ori_max_seq_len = extract_time(ori_data) generated_time, generated_max_seq_len = extract_time(ori_data) max_seq_len = max([ori_max_seq_len, generated_max_seq_len]) ## Builde a post-hoc RNN discriminator network # Network parameters hidden_dim = int(dim / 2) iterations = 2000 batch_size = 128 # Input place holders # Feature X = tf1.placeholder(tf.float32, [None, max_seq_len, dim], name="myinput_x") X_hat = tf1.placeholder(tf.float32, [None, max_seq_len, dim], name="myinput_x_hat") T = tf1.placeholder(tf.int32, [None], name="myinput_t") T_hat = tf1.placeholder(tf.int32, [None], name="myinput_t_hat") # discriminator function def discriminator(x, t): """Simple discriminator function. Args: - x: time-series data - t: time information Returns: - y_hat_logit: logits of the discriminator output - y_hat: discriminator output - d_vars: discriminator variables """ with tf1.variable_scope("discriminator", reuse=tf1.AUTO_REUSE) as vs: d_cell = tf1.nn.rnn_cell.GRUCell( num_units=hidden_dim, activation=tf.nn.tanh, name="d_cell" ) d_outputs, d_last_states = tf1.nn.dynamic_rnn( d_cell, x, dtype=tf.float32, sequence_length=t ) # y_hat_logit = tf1.contrib.layers.fully_connected(d_last_states, 1, activation_fn=None) y_hat_logit = tf1.layers.dense(d_last_states, 1, activation=None) y_hat = tf.nn.sigmoid(y_hat_logit) d_vars = [v for v in tf1.all_variables() if v.name.startswith(vs.name)] return y_hat_logit, y_hat, d_vars y_logit_real, y_pred_real, d_vars = discriminator(X, T) y_logit_fake, y_pred_fake, _ = discriminator(X_hat, T_hat) # Loss for the discriminator d_loss_real = tf1.reduce_mean( tf1.nn.sigmoid_cross_entropy_with_logits( logits=y_logit_real, labels=tf1.ones_like(y_logit_real) ) ) d_loss_fake = tf1.reduce_mean( tf1.nn.sigmoid_cross_entropy_with_logits( logits=y_logit_fake, labels=tf1.zeros_like(y_logit_fake) ) ) d_loss = d_loss_real + d_loss_fake # optimizer d_solver = tf1.train.AdamOptimizer().minimize(d_loss, var_list=d_vars) ## Train the discriminator # Start session and initialize sess = tf1.Session() sess.run(tf1.global_variables_initializer()) # Train/test division for both original and generated data ( train_x, train_x_hat, test_x, test_x_hat, train_t, train_t_hat, test_t, test_t_hat, ) = train_test_divide(ori_data, generated_data, ori_time, generated_time) from tqdm.auto import tqdm # Training step for itt in tqdm(range(iterations), desc="training", total=iterations): # Batch setting X_mb, T_mb = batch_generator(train_x, train_t, batch_size) X_hat_mb, T_hat_mb = batch_generator(train_x_hat, train_t_hat, batch_size) # Train discriminator _, step_d_loss = sess.run( [d_solver, d_loss], feed_dict={X: X_mb, T: T_mb, X_hat: X_hat_mb, T_hat: T_hat_mb}, ) ## Test the performance on the testing set y_pred_real_curr, y_pred_fake_curr = sess.run( [y_pred_real, y_pred_fake], feed_dict={X: test_x, T: test_t, X_hat: test_x_hat, T_hat: test_t_hat}, ) y_pred_final = np.squeeze( np.concatenate((y_pred_real_curr, y_pred_fake_curr), axis=0) ) y_label_final = np.concatenate( ( np.ones( [ len(y_pred_real_curr), ] ), np.zeros( [ len(y_pred_fake_curr), ] ), ), axis=0, ) # Compute the accuracy acc = accuracy_score(y_label_final, (y_pred_final > 0.5)) fake_acc = accuracy_score( np.zeros( [ len(y_pred_fake_curr), ] ), (y_pred_fake_curr > 0.5), ) real_acc = accuracy_score( np.ones( [ len(y_pred_fake_curr), ] ), (y_pred_real_curr > 0.5), ) # print("Fake Accuracy: ", fake_acc) # print("Real Accuracy: ", real_acc) discriminative_score = np.abs(0.5 - acc) return discriminative_score, fake_acc, real_acc