Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import matplotlib | |
from sklearn.preprocessing import StandardScaler | |
import plotly.graph_objects as go | |
import tensorflow as tf | |
import joblib | |
import os | |
from mpltern.datasets import get_triangular_grid | |
from itertools import product | |
# from llama_cpp import Llama | |
import time | |
import networkx as nx | |
import matplotlib.patheffects as pe | |
from huggingface_hub import hf_hub_download | |
from huggingface_hub import list_repo_files | |
import shutil | |
import tempfile | |
csv_cache = {} | |
# --- Dark mode setup --- | |
matplotlib.rcParams.update({ | |
'axes.facecolor': '#111111', | |
'figure.facecolor': '#111111', | |
'axes.edgecolor': 'white', | |
'axes.labelcolor': 'white', | |
'xtick.color': 'white', | |
'ytick.color': 'white', | |
'text.color': 'white', | |
'savefig.facecolor': '#111111', | |
}) | |
# Creating the ternary mesh | |
x_ = np.linspace(1, 98, 98)[::-1] | |
y_ = np.linspace(1, 98, 98)[::-1] | |
z_ = np.linspace(1, 98, 98)[::-1] | |
x__, y__, z__ = np.meshgrid(x_, y_, z_, indexing='ij') | |
x, y, z = np.where(x__ + y__ + z__ == 100) | |
max(x_[x] + y_[y] + z_[z]) | |
generation_mix = np.concatenate([x_[x].reshape(x.shape[0],1),z_[z].reshape(x.shape[0],1),y_[y].reshape(x.shape[0],1)],1) | |
image_options = ["distribution.png", "contribution.png", "voltage_violations.png"] | |
donut_data = { | |
"Nine-Bus-Load-Increase-Event": { | |
"values": [4239, 612, 207, 393], | |
"titles": ["Stable\nCases", "Unstable\nCases", "Failed\nCases", "Voltage\nViolations"] | |
}, | |
"Nine-Bus-Short-Circuit-Event": { | |
"values": [4239, 612, 207, 393], | |
"titles": ["Stable\nCases", "Unstable\nCases", "Failed\nCases", "Voltage\nViolations"] | |
} | |
} | |
def plot_adjacency_graph(ix, folder, layer): | |
result = test_read(int(ix), folder) | |
print("DEBUG: test_read returned type:", type(result)) | |
print("DEBUG: test_read content preview:", repr(result)) | |
if not isinstance(result, tuple) or len(result) != 6: | |
raise ValueError("Expected 6 outputs from test_read(), but got something else.") | |
X_scaled, adj_matrices, y_labels_bin, largest_eig, df_clean, df_raw = result | |
last_graph = adj_matrices[-1, -1, :, :, :] | |
fig = visualize_adjacency_2d_clean(last_graph, layer) | |
return fig | |
def update_metrics_on_folder_change(folder): | |
values_map = { | |
"Nine-Bus-Load-Increase-Event": [4239, 612, 207, 393, 47], | |
"Nine-Bus-Short-Circuit-Event": [3656, 1195, 4312, 539, 30], | |
} | |
values = values_map.get(folder, [0, 0, 0, 0, 0]) | |
return [ | |
f""" | |
<div style='display: flex; flex-direction: column; align-items: center; text-align: center; width: 100%;'> | |
<div class='metric-label'>{init_donut_titles[i]}</div> | |
<div class='metric-value'>{values[i]:,}</div> | |
</div> | |
""" for i in range(5) | |
] | |
def toggle_auto_mode(current_state): | |
new_state = not current_state | |
return ( | |
new_state, | |
gr.update( | |
value="Auto-Predict (ON)" if new_state else "Auto-Predict (OFF)", | |
elem_classes=["auto-on"] if new_state else ["auto-off"] | |
) | |
) | |
def maybe_auto_predict(sg, gfm, gfl, folder, auto_enabled): | |
time.sleep(5) | |
if auto_enabled: | |
ix = find_index(sg, 100 - sg - gfm, gfm) | |
ix = find_index(sg, 100 - sg - gfm, gfm) | |
return plot_result(ix, folder) | |
return gr.update() # no update if auto-predict is off | |
def maybe_auto_graph(sg, gfm, gfl, folder, auto_enabled): | |
if auto_enabled: | |
ix = find_index(sg, 100 - sg - gfm, gfm) | |
return plot_adjacency_graph(ix, folder) | |
return gr.update() # no update if auto-predict is off | |
def make_donut(value, title, fill_color='#00FFAA'): | |
frac = value / 4851 | |
fig, ax = plt.subplots(figsize=(2.4, 2.4), dpi=100) | |
fig.patch.set_facecolor("#1c1c1c") | |
# Donut | |
ax.pie( | |
[frac, 1 - frac], | |
radius=1, | |
startangle=90, | |
colors=[fill_color, '#333333'], | |
wedgeprops=dict(width=0.1) | |
) | |
ax.set(aspect="equal") | |
# Inset label inside the hole | |
ax.text( | |
0, 0.05, # slight upward nudge | |
f"{title}\n{int(value)} / 4851", | |
ha='center', | |
va='center', | |
fontsize=12, | |
color='white' | |
) | |
# fig.patch.set_facecolor('#111111') | |
plt.subplots_adjust(left=0.05, right=0.95, top=0.95, bottom=0.05) # keep layout tight | |
return fig | |
def update_donuts(values, titles): | |
return [ | |
make_donut(values[0], titles[0]), | |
make_donut(values[1], titles[1]), | |
make_donut(values[2], titles[2]), | |
make_donut(values[3], titles[3]), | |
] | |
def cycle_image(current_index, folder): | |
next_index = (current_index + 1) % len(image_options) | |
filename = image_options[next_index] | |
try: | |
# Download to cache | |
cached_path = hf_hub_download( | |
repo_id=f"SevatarOoi/{folder}", | |
filename=filename, | |
repo_type="dataset", | |
token=os.getenv("DATA_ACCESS"), | |
) | |
# Copy to temp | |
temp_path = os.path.join(tempfile.gettempdir(), f"{folder}_{filename}") | |
shutil.copy(cached_path, temp_path) | |
return temp_path, next_index | |
except Exception as e: | |
print(f"cycle_image error: {e}") | |
return None, current_index | |
def get_distribution_image(folder): | |
try: | |
# Download to cache | |
cached_path = hf_hub_download( | |
repo_id=f"SevatarOoi/{folder}", | |
filename="distribution.png", | |
token=os.getenv("DATA_ACCESS"), | |
repo_type="dataset" | |
) | |
# Copy to temp | |
temp_path = os.path.join(tempfile.gettempdir(), f"{folder}_distribution.png") | |
shutil.copy(cached_path, temp_path) | |
return temp_path | |
except Exception as e: | |
print(f"get_distribution_image error: {e}") | |
return None | |
def update_donuts_on_folder_change(folder): | |
data = donut_data.get(folder, {"values": [0]*4, "titles": [f"Donut {i+1}" for i in range(4)]}) | |
colors = ['#0040FF', '#FF00AA', '#AA00FF', '#00FFFF'] # Neon blue, red, purple, cyan | |
figs = [] | |
for i in range(4): | |
fig = make_donut(data["values"][i], data["titles"][i], fill_color=colors[i]) | |
figs.append(fig) | |
return figs | |
def get_folder_info(folder): | |
folder_map = { | |
"Nine-Bus-Load-Increase-Event": "🟦 Load Increase Dataset \nSimulation Duration: 60 s \nTime Resolution: 0.001 s \nEvent: 10% global load increase \nEvent Start: s = 20\nEvent Duration: 40 s", | |
"Nine-Bus-Short-Circuit-Event": "🟥 Short-Circuit Dataset \nSimulation Duration: 60 s \nTime Resolution: 0.001 s \nEvent: Short-circuit at Line 4-5 \nEvent Start: s = 20\nEvent Duration: 0.05 s" | |
} | |
description = folder_map.get(folder, "ℹ️ No description available.") | |
try: | |
files = list_repo_files( | |
repo_id=f"SevatarOoi/{folder}", | |
repo_type="dataset", | |
token=os.getenv("DATA_ACCESS"), | |
) | |
csv_files = [f for f in files if f.endswith("_100.csv")] | |
num_files = len(csv_files) | |
stats = f"📁 Files: {num_files} \n💾 Size: N/A (remote access)" | |
except Exception as e: | |
print(f"⚠️ Failed to list files in {folder}: {e}") | |
stats = "📁 Files: 0 \n💾 Size: N/A" | |
return f"{description}\n\n{stats}" | |
# def load_csv(ix, folder): | |
# cache_key = (folder, ix) # key must include folder | |
# if cache_key not in csv_cache: | |
# filepath = f'./{folder}/{ix}_100.csv' | |
# if not os.path.exists(filepath): | |
# return None | |
# try: | |
# df = pd.read_csv( | |
# filepath, | |
# header=None, | |
# on_bad_lines='skip', | |
# low_memory=False, | |
# na_values=["-nan(ind)", "nan", "NaN", ""] | |
# ) | |
# df = df.fillna(0) | |
# csv_cache[cache_key] = df | |
# except Exception as e: | |
# print(f"Failed to load {filepath}: {e}") | |
# return None | |
# return csv_cache[cache_key] | |
def load_csv(ix, folder): | |
cache_key = (folder, ix) | |
if cache_key not in csv_cache: | |
try: | |
csv_path = hf_hub_download( | |
repo_id=f"SevatarOoi/{folder}", | |
filename=f"{ix}_100.csv", | |
token=os.getenv("DATA_ACCESS"), | |
repo_type="dataset" | |
) | |
df = pd.read_csv( | |
csv_path, | |
header=None, | |
on_bad_lines='skip', | |
low_memory=False, | |
na_values=["-nan(ind)", "nan", "NaN", ""] | |
).fillna(0) | |
csv_cache[cache_key] = df | |
except Exception as e: | |
print(f"❌ Failed to load {ix} in {folder}: {e}") | |
return None | |
return csv_cache[cache_key] | |
def blank_figure(msg="No data available"): | |
fig, ax = plt.subplots() | |
fig.patch.set_facecolor("#1c1c1c") | |
ax.text(0.5, 0.5, msg, ha='center', va='center', color='white', fontsize=12) | |
ax.axis('off') | |
# fig.patch.set_facecolor('#111111') | |
return fig | |
def clear_csv_cache(): | |
csv_cache.clear() | |
# --- Ternary plot callback --- | |
def mpltern_plot(sg=None, gfl=None, gfm=None): | |
t, l, r = get_triangular_grid(n=21) | |
fig = plt.figure(figsize=(6, 6)) | |
fig.patch.set_facecolor("#1c1c1c") | |
ax = fig.add_subplot(projection='ternary', ternary_sum=100) | |
ax.triplot(t, l, r, color='grey', alpha=0.3) | |
if sg is not None and gfl is not None and gfm is not None: | |
ax.scatter(sg, gfm, gfl, color='white', s=40, edgecolor='black', label='Selected') | |
ax.set_tlabel('η(SG) [%]') | |
ax.set_llabel('η(GFM) [%]') | |
ax.set_rlabel('η(GFL) [%]') | |
ax.taxis.set_label_position('tick1') | |
ax.laxis.set_label_position('tick1') | |
ax.raxis.set_label_position('tick1') | |
ax.taxis.set_tick_params(tick2On=True, colors='#0096FF', grid_color='grey') | |
ax.laxis.set_tick_params(tick2On=True, colors='#FFFF00', grid_color='grey') | |
ax.raxis.set_tick_params(tick2On=True, colors='#39FF14', grid_color='grey') | |
ax.taxis.label.set_color('#0096FF') | |
ax.laxis.label.set_color('#FFFF00') | |
ax.raxis.label.set_color('#39FF14') | |
ax.spines['tside'].set_color('#39FF14') | |
ax.spines['lside'].set_color('#0096FF') | |
ax.spines['rside'].set_color('#FFFF00') | |
# ax.set_title("15% Active Load Increase - Incomplete, Unstable Cases", color='white') | |
return fig | |
def on_sg_change(sg_val, gfm_val, gfl_val, folder): | |
sg_val = int(np.clip(sg_val, 1, 98)) | |
remaining = 100 - sg_val | |
gfm_val = int(np.clip(gfm_val, 1, remaining - 1)) | |
gfl_val = 100 - sg_val - gfm_val | |
if gfl_val < 1: | |
gfl_val = 1 | |
gfm_val = 100 - sg_val - gfl_val | |
ix = find_index(sg_val, gfl_val, gfm_val) | |
if ix == "Not found" or load_csv(ix, folder) is None: | |
return sg_val, gfm_val, gfl_val, "", mpltern_plot(sg_val, gfl_val, gfm_val), blank_figure("No voltage"), blank_figure("No generation"), blank_figure("No eigenvalues") | |
return sg_val, gfm_val, gfl_val, ix, mpltern_plot(sg_val, gfl_val, gfm_val), voltage_plot(ix, folder), generation_plot(ix, folder), eigenvalue_plot(ix, folder) | |
def on_gfm_change(sg_val, gfm_val, gfl_val, folder): | |
gfm_val = int(np.clip(gfm_val, 1, 98)) | |
remaining = 100 - sg_val | |
gfl_val = 100 - sg_val - gfm_val | |
if gfl_val < 1: | |
gfl_val = 1 | |
gfm_val = 100 - sg_val - gfl_val | |
ix = find_index(sg_val, gfl_val, gfm_val) | |
if ix == "Not found" or load_csv(ix, folder) is None: | |
return sg_val, gfm_val, gfl_val, "", mpltern_plot(sg_val, gfl_val, gfm_val), blank_figure("No voltage"), blank_figure("No generation"), blank_figure("No eigenvalues") | |
return sg_val, gfm_val, gfl_val, ix, mpltern_plot(sg_val, gfl_val, gfm_val), voltage_plot(ix, folder), generation_plot(ix, folder), eigenvalue_plot(ix, folder) | |
def on_gfl_change(sg_val, gfm_val, gfl_val, folder): | |
gfl_val = int(np.clip(gfl_val, 1, 98)) | |
remaining = 100 - sg_val | |
gfm_val = 100 - sg_val - gfl_val | |
if gfm_val < 1: | |
gfm_val = 1 | |
gfl_val = 100 - sg_val - gfm_val | |
ix = find_index(sg_val, gfl_val, gfm_val) | |
if ix == "Not found" or load_csv(ix, folder) is None: | |
return sg_val, gfm_val, gfl_val, "", mpltern_plot(sg_val, gfl_val, gfm_val), blank_figure("No voltage"), blank_figure("No generation"), blank_figure("No eigenvalues") | |
return sg_val, gfm_val, gfl_val, ix, mpltern_plot(sg_val, gfl_val, gfm_val), voltage_plot(ix, folder), generation_plot(ix, folder), eigenvalue_plot(ix, folder) | |
def on_folder_change(sg_val, gfm_val, gfl_val, folder): | |
ix = find_index(sg_val, gfl_val, gfm_val) | |
ix_int = int(ix) if ix != "Not found" else -1 | |
return ( | |
ix, | |
mpltern_plot(sg_val, gfl_val, gfm_val), | |
voltage_plot(ix_int,folder) if ix_int >= 0 else blank_figure("No voltage"), | |
generation_plot(ix_int,folder) if ix_int >= 0 else blank_figure("No voltage"), | |
eigenvalue_plot(ix_int,folder) if ix_int >= 0 else blank_figure("No eigenvalues"), | |
# plot_result(ix_int,folder) if ix_int >= 0 else blank_figure("No prediction") | |
) | |
def find_index(sg, gfl, gfm): | |
target = np.array([sg, gfl, gfm]) | |
for i, row in enumerate(generation_mix): | |
if np.array_equal(row, target): | |
return str(i) | |
return "Not found" | |
# --- Custom Keras functions --- | |
def squeeze_last_axis(x): | |
import tensorflow as tf | |
return tf.squeeze(x, axis=-1) | |
def zeros_like_input(x): | |
import tensorflow as tf | |
return tf.zeros_like(x) | |
class GraphConvLSTMCell(tf.keras.layers.Layer): | |
def __init__(self, units, **kwargs): | |
super(GraphConvLSTMCell, self).__init__(**kwargs) | |
self.units = units | |
def build(self, input_shape): | |
self.gc_x = tf.keras.layers.Dense(4 * self.units, use_bias=False) | |
self.gc_h = tf.keras.layers.Dense(4 * self.units, use_bias=True) | |
def call(self, inputs, states): | |
X, A = inputs | |
h_prev, c_prev = states | |
AX = tf.matmul(A, X) | |
gates_x = self.gc_x(AX) | |
gates_h = self.gc_h(h_prev) | |
gates = gates_x + gates_h | |
i, f, o, g = tf.split(gates, num_or_size_splits=4, axis=-1) | |
i = tf.sigmoid(i) | |
f = tf.sigmoid(f) | |
o = tf.sigmoid(o) | |
g = tf.tanh(g) | |
c = f * c_prev + i * g | |
h = o * tf.tanh(c) | |
return h, [h, c] | |
def get_config(self): | |
return {"units": self.units, **super().get_config()} | |
# --- Load resources --- | |
scaler = joblib.load("scaler_9bus.pkl") | |
model = tf.keras.models.load_model( | |
"9bus_li010_and_sc_w1000_genmix_sequential_lstgcm2_buses468_gens.keras", | |
custom_objects={ | |
"GraphConvLSTMCell": GraphConvLSTMCell, | |
}, | |
safe_mode=False | |
) | |
# --- Placeholder for adjacency computation --- | |
def compute_multilayer_adjacency(X, dt, total_vars, num_gens, r=5): | |
X1, X2 = X[:-1, :].T, X[1:, :].T | |
U, S, Vh = np.linalg.svd(X1, full_matrices=False) | |
Ur, Sr, Vr = U[:, :r], np.diag(S[:r]), Vh[:r, :] | |
A_tilde = Ur.T @ X2 @ Vr.T @ np.linalg.pinv(Sr) | |
eigvals, W = np.linalg.eig(A_tilde) | |
Phi = X2 @ Vr.T @ np.linalg.pinv(Sr) @ W | |
element_factors = np.abs(Phi) | |
adj_part_factors = element_factors @ element_factors.T | |
mode_magnitudes = element_factors.mean(axis=1) | |
adj_mode_magnitude = np.outer(mode_magnitudes, mode_magnitudes) | |
mode_phases = np.angle(Phi) | |
phase_per_element = mode_phases.mean(axis=1) | |
adj_mode_phase = np.outer(phase_per_element, phase_per_element) | |
eig_real = np.abs(eigvals.real.mean()) | |
adj_eig_real = np.full((total_vars, total_vars), eig_real) | |
eig_norm = np.linalg.norm(eigvals) | |
adj_eig_norm = np.full((total_vars, total_vars), eig_norm) | |
# gen_mix_weights = np.ones(total_vars) | |
# gen_mix_weights[-num_gens:] = np.linspace(0.5, 1.5, num_gens) | |
# adj_gen_mix = np.outer(gen_mix_weights, gen_mix_weights) | |
adj_matrix = np.stack([ | |
adj_part_factors, | |
adj_mode_magnitude, | |
adj_mode_phase, | |
adj_eig_real, | |
adj_eig_norm, | |
# adj_gen_mix | |
], axis=-1) | |
for layer in range(adj_matrix.shape[-1]): | |
max_val = adj_matrix[:, :, layer].max() | |
if max_val != 0: | |
adj_matrix[:, :, layer] /= max_val | |
else: | |
adj_matrix[:, :, layer] = 0 | |
return adj_matrix | |
# --- Preprocessing and prediction pipeline --- | |
def test_read(ix,folder): | |
csv_path = hf_hub_download( | |
repo_id=f"SevatarOoi/{folder}", | |
filename=f"{ix}_100.csv", | |
token=os.getenv("DATA_ACCESS"), | |
repo_type="dataset" | |
) | |
df_raw = load_csv(ix, folder) # loads it properly with skiprows=2 | |
data_list = [] | |
indices = [] | |
# df_raw = pd.read_csv(f"./9bus_load_inc/{int(ix)}_100.csv", header=None, on_bad_lines='skip', low_memory=False) | |
# df_raw = load_csv(ix, folder) | |
if df_raw is None: | |
return blank_figure("CSV not found") | |
meta_components = df_raw.iloc[0] | |
meta_variables = df_raw.iloc[1] | |
max_length = 30000 | |
if len(df_raw.iloc[2:60002]) == 60000: | |
data = df_raw.iloc[2:max_length+2].reset_index(drop=True) | |
# Bus-related data explicitly | |
bus_names = [f'Bus {i}' for i in [4,6,8]] | |
bus_vars = ['m:u1 in p.u.', 'm:fehz in Hz'] | |
# Generator data explicitly | |
gen_names = ['G1', 'GFM', 'PV GFL'] | |
gen_vars = ['m:P:bus1 in MW', 'm:Q:bus1 in Mvar', 'n:fehz:bus1 in Hz'] | |
selected_columns = [] | |
renamed_columns = [] | |
# Select bus columns | |
for col_idx, (comp, var) in enumerate(zip(meta_components, meta_variables)): | |
if comp in bus_names and var in bus_vars: | |
selected_columns.append(data.columns[col_idx]) | |
suffix = 'V' if 'u1' in var else 'f' | |
renamed_columns.append(f"{comp.replace(' ', '')}_{suffix}") | |
# Select generator columns | |
for col_idx, (comp, var) in enumerate(zip(meta_components, meta_variables)): | |
if comp in gen_names and var in gen_vars: | |
selected_columns.append(data.columns[col_idx]) | |
var_short = var.split(':')[1].replace('bus1 in ', '').replace(' ', '') | |
renamed_columns.append(f"{comp.replace(' ', '')}_{var_short}") | |
# Select generator columns | |
df_clean = data[selected_columns].copy() | |
df_clean.columns = renamed_columns | |
df_clean = df_clean.astype(float) | |
data_list.append(df_clean.values) | |
indices.append(ix) | |
data_array = np.stack(data_list, axis=0) | |
data = data_array | |
# Parameters | |
dt = 0.001 | |
window_size = 1000 | |
step_size = 100 | |
sequence_length = 5 | |
num_buses = 3 | |
num_gens = 3 | |
total_vars = 2 * num_buses + 3 * num_gens | |
np.random.seed(42) | |
r = 5 | |
X_sequences, A_sequences, y_labels, y_labels_bin = [], [], [], [] | |
for i, idx in enumerate(indices): | |
eig_path = hf_hub_download( | |
repo_id=f"SevatarOoi/{folder}", | |
filename=f"eigenvalues_b_{idx}_100.csv", | |
token=os.getenv("DATA_ACCESS"), | |
repo_type="dataset" | |
) | |
largest_eig = np.loadtxt(eig_path, delimiter=',')[0][0] | |
# largest_eig = np.loadtxt(f'./{folder}/eigenvalues_b_{idx}_100.csv', delimiter=',')[0][0] | |
print(f'largest eigenvalue: {largest_eig}') | |
# Precompute all adjacencies clearly once | |
adj_list = [] | |
X_windows = [] | |
for j in range(0, max_length - window_size + 1, step_size): | |
window = data_array[i, j:j + window_size, :] | |
X_windows.append(window) | |
adj_matrix = compute_multilayer_adjacency(window, dt, total_vars, num_gens, r) | |
adj_list.append(adj_matrix) | |
# Use precomputed adjacencies explicitly in sequences | |
num_sequences = len(X_windows) - sequence_length + 1 | |
for seq_start in range(num_sequences): | |
X_seq = X_windows[seq_start:seq_start + sequence_length] | |
A_seq = adj_list[seq_start:seq_start + sequence_length] | |
X_sequences.append(X_seq) | |
A_sequences.append(A_seq) | |
y_labels.append(largest_eig) | |
y_labels_bin.append( | |
1 if largest_eig > 0.00001 and (19000 + seq_start * step_size) > (20000 - window_size) else 0 | |
) | |
X_sequences = np.array(X_sequences) | |
adj_matrices = np.array(A_sequences) | |
X_scaled = scaler.transform(X_sequences.reshape(-1, X_sequences.shape[-1])) | |
# Reshape back to original shape | |
X_scaled = X_scaled.reshape(X_sequences.shape) | |
y_labels = np.array(y_labels) | |
y_labels_bin = np.array(y_labels_bin) | |
print("DEBUG: test_read returning", type(X_scaled), type(adj_matrices), type(y_labels_bin), type(largest_eig), type(df_clean), type(df_raw)) | |
return X_scaled, adj_matrices, y_labels_bin, largest_eig, df_clean, df_raw | |
def plot_result(ix_str, folder): | |
try: | |
if ix_str == "Not found": | |
return blank_figure("No valid index for prediction") | |
ix = int(ix_str) | |
X, A, y, eig, _, df_raw = test_read(ix, folder) | |
y_p = model.predict([X, A]).flatten() | |
fig, ax = plt.subplots(figsize=(16, 10)) | |
fig.patch.set_facecolor("#1c1c1c") | |
ax.scatter(range(len(y_p)), y_p, s=1) | |
ax.set_ylim([-0.5, 1.5]) | |
ax.set_title(f'Case {ix} Destabilization Likelihood Over Time') | |
ax.set_ylabel('Destabilization Likelihood') | |
ax.set_xlabel('Time (100 ms)') | |
fig.tight_layout() | |
return fig | |
except Exception as e: | |
fig, ax = plt.subplots() | |
fig.patch.set_facecolor("#1c1c1c") | |
ax.text(0.5, 0.5, f"❌ {str(e)}", ha='center', va='center') | |
ax.axis('off') | |
fig.tight_layout() | |
return fig | |
def plot_waveform_from_df(df_raw): | |
try: | |
data = df_raw[2:60000:100, [7, 11, 15]].astype(float) | |
fig, ax = plt.subplots() | |
fig.patch.set_facecolor("#1c1c1c") | |
ax.plot(data.index, data.iloc[:, 0], label='Col 9') | |
ax.plot(data.index, data.iloc[:, 1], label='Col 13') | |
ax.plot(data.index, data.iloc[:, 2], label='Col 17') | |
ax.set_title("Selected Waveforms") | |
ax.legend() | |
return fig | |
except Exception as e: | |
fig, ax = plt.subplots() | |
fig.patch.set_facecolor("#1c1c1c") | |
ax.text(0.5, 0.5, f"Waveform Error: {e}", ha='center', va='center') | |
ax.axis('off') | |
return fig | |
def voltage_plot(ix, folder): | |
df_raw = load_csv(ix, folder) | |
if df_raw is None: | |
return blank_figure("CSV not found") | |
try: | |
voltages = df_raw.iloc[2:60000:600, [7, 11, 15]].astype(float) | |
except Exception as e: | |
return blank_figure(f"Voltage plot error: {e}") | |
fig, ax = plt.subplots(figsize=(16,5)) | |
fig.patch.set_facecolor("#1c1c1c") | |
ax.plot(voltages.index, voltages.iloc[:, 0], label='Bus 4') | |
ax.plot(voltages.index, voltages.iloc[:, 1], label='Bus 6') | |
ax.plot(voltages.index, voltages.iloc[:, 2], label='Bus 8') | |
ax.set_title("Principle Bus Voltages") | |
ax.set_ylabel("Voltage (p.u.)") | |
ax.set_xlabel("Time (ms)") | |
ax.legend() | |
fig.tight_layout(pad=2) | |
return fig | |
def generation_plot(ix, folder): | |
df_raw = load_csv(ix, folder) | |
if df_raw is None: | |
return blank_figure("CSV not found") | |
try: | |
SG = np.sqrt(df_raw.iloc[2:,19].astype(float)**2 + df_raw.iloc[2:,20].astype(float)**2) | |
GFM = np.sqrt(df_raw.iloc[2:,22].astype(float)**2 + df_raw.iloc[2:,23].astype(float)**2) | |
GFL = np.sqrt(df_raw.iloc[2:,51].astype(float)**2 + df_raw.iloc[2:,52].astype(float)**2) | |
except Exception as e: | |
return blank_figure(f"Generation plot error: {e}") | |
fig, ax = plt.subplots(figsize=(16,5)) | |
fig.patch.set_facecolor("#1c1c1c") | |
ax.plot(SG.index, SG, label='SG',color='#0096FF') | |
ax.plot(GFM.index, GFM, label='GFM',color='#FFFF00') | |
ax.plot(GFL.index, GFL, label='GFL',color='#39FF14') | |
ax.set_title("Generation by Type") | |
ax.set_ylabel("Dispatches (MVA)") | |
ax.set_xlabel("Time (ms)") | |
ax.legend() | |
fig.tight_layout(pad=2) | |
return fig | |
def eigenvalue_plot(ix, folder): | |
try: | |
eig_path = hf_hub_download( | |
repo_id=f"SevatarOoi/{folder}", | |
filename=f"eigenvalues_b_{ix}_100.csv", | |
token=os.getenv("DATA_ACCESS"), | |
repo_type="dataset" | |
) | |
eigenvalues = np.loadtxt(eig_path, delimiter=',') | |
# eigenvalues = np.loadtxt(f'./{folder}/eigenvalues_b_{int(ix)}_100.csv', delimiter=',') | |
except: | |
return go.Figure().update_layout( | |
paper_bgcolor="#111111", | |
plot_bgcolor="#111111", | |
annotations=[dict(text="No such case", x=0.5, y=0.5, showarrow=False, font=dict(color="white"))] | |
) | |
# Clip values | |
real = np.clip(eigenvalues[:, 0], -12, 12) | |
imag = np.clip(eigenvalues[:, 1], -12, 12) | |
fig = go.Figure() | |
fig.add_trace(go.Scatter( | |
x=real, | |
y=imag, | |
mode='markers', | |
marker=dict(color='cyan', size=5), | |
hovertemplate="Re: %{x:.2f}<br>Im: %{y:.2f}<extra></extra>" | |
)) | |
fig.update_layout( | |
title="System Eigenvalues", | |
xaxis=dict(title="Real Part", range=[-12, 12], color='white', gridcolor='gray'), | |
yaxis=dict(title="Imaginary Part", range=[-12, 12], color='white', gridcolor='gray'), | |
plot_bgcolor="#111111", | |
paper_bgcolor="#111111", | |
font=dict(color='white'), | |
) | |
return fig | |
def regenerate_adjacency_gif(ix_display, folder): | |
ix = int(ix_display.strip()) | |
X_scaled, adj_matrices, _, _, _, _ = test_read(ix, folder) | |
gif_path = f"adj_gif_{ix}.gif" | |
generate_adjacency_gif_from_tensor(adj_matrices, layer=0, output_path=gif_path) | |
return gif_path | |
def switch_ui(current): | |
return ( | |
gr.update(visible=not current), | |
gr.update(visible=current), | |
not current # update internal state | |
) | |
# with open("grid_cortex_detailed_system_prompts.txt", "r") as f: | |
# with open("grid_cortex_super_summary_prompts.txt", "r") as f: | |
# system_prompt = "\n".join(f.read().splitlines()) | |
# llm = Llama( | |
# model_path="mistral-7b-instruct-v0.1.Q4_K_M.gguf", | |
# n_ctx=1024, # reduced context window = faster | |
# n_threads=8, | |
# n_batch=64 # optionally tune for faster CPU throughput | |
# ) | |
# MAX_RESPONSE_TOKENS = 1500000 # lower for speed | |
# MAX_CHAT_PAIRS = 5 | |
def count_tokens(text): | |
return len(llm.tokenize(text.encode("utf-8"))) | |
def trim_chat_by_tokens(history, system_prompt, user_input, max_tokens=1024, buffer_tokens=200): | |
# Reserve space for response and system + current user input | |
reserved = buffer_tokens + count_tokens(system_prompt + f"\nUser: {user_input}\nAssistant:") | |
prompt_parts = [] | |
total_tokens = 0 | |
for user, bot in reversed(history): # start from latest | |
block = f"User: {user}\nAssistant: {bot}\n" | |
block_tokens = count_tokens(block) | |
if total_tokens + block_tokens + reserved > max_tokens: | |
break | |
prompt_parts.insert(0, block) # prepend | |
total_tokens += block_tokens | |
history_text = "".join(prompt_parts) | |
full_prompt = system_prompt + "\n" + history_text + f"User: {user_input}\nAssistant:" | |
return full_prompt | |
context_limit = 1024 # same as n_ctx you passed to Llama | |
max_response_tokens = 1500000 | |
def chat_with_gpt(system_prompt, history, user_input): | |
prompt = f"{system_prompt}\nUser: {user_input}\nAssistant:" | |
output = llm( | |
prompt=prompt, | |
max_tokens=1500000, | |
temperature=0.0, | |
echo=False | |
) | |
response = output["choices"][0]["text"].strip() | |
# Store only current pair (if you still want chatbot display) | |
return [{"role": "user", "content": user_input}, {"role": "assistant", "content": response}], "" | |
def visualize_adjacency_2d_clean( | |
adj_matrix, | |
layer, | |
max_node_size=3000, | |
max_node_fontsize=16, | |
max_edge_width=8, | |
max_edge_fontsize=14 | |
): | |
raw_labels = [ | |
"Bus 4 Voltage", "Bus 4 Frequency", | |
"Bus 6 Voltage", "Bus 6 Frequency", | |
"Bus 8 Voltage", "Bus 8 Frequency", | |
"SG Active Power", "SG Reactive Power", "SG Frequency", | |
"GFM Active Power", "GFM Reactive Power", "GFM Frequency", | |
"GFL Active Power", "GFL Reactive Power", "GFL Frequency" | |
] | |
layer_labels = ["Participation Factors", "Mode Magnitudes", "Modes Phases"] | |
multiline_labels = ["\n".join(label.split()) for label in raw_labels] | |
sg_nodes = multiline_labels[6:9] | |
gfm_nodes = multiline_labels[9:12] | |
gfl_nodes = multiline_labels[12:15] | |
adj_matrix = adj_matrix[:, :, int(layer)].copy() | |
np.fill_diagonal(adj_matrix, 0) | |
G = nx.from_numpy_array(adj_matrix) | |
threshold = 0.05 | |
edges_to_remove = [(u, v) for u, v, d in G.edges(data=True) if d["weight"] < threshold] | |
G.remove_edges_from(edges_to_remove) | |
raw_pos = nx.circular_layout(G) | |
mapping = dict(zip(G.nodes(), multiline_labels)) | |
G = nx.relabel_nodes(G, mapping) | |
pos = {mapping[k]: v for k, v in raw_pos.items()} | |
node_strength = dict(G.degree(weight="weight")) | |
node_sizes = [min(max_node_size, max(300, node_strength[n] * 1000)) for n in G.nodes()] | |
node_borders = [] | |
for n in G.nodes(): | |
if n in sg_nodes: | |
node_borders.append("#00ffff") | |
elif n in gfm_nodes: | |
node_borders.append("#ffff00") | |
elif n in gfl_nodes: | |
node_borders.append("#00ff66") | |
else: | |
node_borders.append("#6a0dad") | |
edge_colors = [] | |
edge_widths = [] | |
edge_font_sizes = {} | |
for u, v, d in G.edges(data=True): | |
w = d["weight"] | |
edge_widths.append(min(max_edge_width, w * 4)) | |
edge_font_sizes[(u, v)] = min(max_edge_fontsize, max(9, w * 20)) | |
if u in sg_nodes and v in gfm_nodes: | |
edge_colors.append("#80ff80") | |
elif u in sg_nodes and v in gfl_nodes: | |
edge_colors.append("#66ffcc") | |
elif u in gfm_nodes and v in gfl_nodes: | |
edge_colors.append("#ccff33") | |
elif u in sg_nodes: | |
edge_colors.append("#00ffff") | |
elif u in gfm_nodes: | |
edge_colors.append("#ffff00") | |
elif u in gfl_nodes: | |
edge_colors.append("#00ff66") | |
else: | |
edge_colors.append("#ffffff") | |
node_font_sizes = {n: min(max_node_fontsize, max(9, node_strength[n] * 5)) for n in G.nodes()} | |
fig, ax = plt.subplots(figsize=(12, 12), facecolor="none") | |
ax.set_xlim(-1.1, 1.1) | |
ax.set_ylim(-1.1, 1.1) | |
for n in G.nodes(): | |
idx = list(G.nodes()).index(n) | |
nx.draw_networkx_nodes( | |
G, pos, | |
ax=ax, | |
nodelist=[n], | |
node_shape="o", | |
node_color="#111111", | |
edgecolors=[node_borders[idx]], | |
linewidths=2, | |
node_size=[node_sizes[idx]] | |
) | |
ax.text( | |
*pos[n], n, | |
ha='center', va='center', | |
fontsize=node_font_sizes[n], | |
fontweight='bold', | |
color='white', | |
path_effects=[ | |
pe.Stroke(linewidth=1.5, foreground="black"), | |
pe.Normal() | |
] | |
) | |
nx.draw_networkx_edges( | |
G, pos, | |
ax=ax, | |
edge_color=edge_colors, | |
width=edge_widths | |
) | |
edge_labels = {(u, v): f"{G[u][v]['weight']:.2f}" for u, v in G.edges()} | |
for (u, v), label in edge_labels.items(): | |
x = (pos[u][0] + pos[v][0]) / 2 | |
y = (pos[u][1] + pos[v][1]) / 2 + 0.04 | |
ax.text( | |
x, y, label, | |
fontsize=edge_font_sizes[(u, v)], | |
color='white', | |
ha='center', va='center', | |
path_effects=[ | |
pe.Stroke(linewidth=1.5, foreground="black"), | |
pe.Normal() | |
] | |
) | |
# plt.title(f"Adjacency Graph ({layer_labels[int(layer)]})", fontsize=16, color='white') | |
plt.axis("off") | |
fig.patch.set_alpha(0.0) | |
plt.close(fig) | |
return fig | |
# # Visualize last graph in the last sequence | |
# X_scaled, adj_matrices, y_labels_bin, largest_eig, df_clean, df_raw = test_read(33,init_folder) | |
# last_graph = adj_matrices[-1,-1,:,:,:] # shape (N, N, D) | |
# visualize_adjacency_2d_clean(last_graph,1) | |
from PIL import Image | |
def generate_adjacency_gif_from_tensor(adj_matrices, layer=0, step=10, total_time_sec=60, output_path="adjacency_evolution.gif"): | |
import os | |
import tempfile | |
import matplotlib.pyplot as plt | |
import networkx as nx | |
import matplotlib.patheffects as pe | |
import numpy as np | |
substack = adj_matrices[:, -1, :, :, :] | |
num_frames = substack.shape[0] | |
dt = total_time_sec / num_frames | |
raw_labels = [ | |
"Bus 4 Voltage", "Bus 4 Frequency", | |
"Bus 6 Voltage", "Bus 6 Frequency", | |
"Bus 8 Voltage", "Bus 8 Frequency", | |
"SG Active Power", "SG Reactive Power", "SG Frequency", | |
"GFM Active Power", "GFM Reactive Power", "GFM Frequency", | |
"GFL Active Power", "GFL Reactive Power", "GFL Frequency" | |
] | |
multiline_labels = ["\n".join(label.split()) for label in raw_labels] | |
sg_nodes = multiline_labels[6:9] | |
gfm_nodes = multiline_labels[9:12] | |
gfl_nodes = multiline_labels[12:15] | |
temp_dir = tempfile.mkdtemp() | |
frame_paths = [] | |
for i in range(0, num_frames, step): | |
adj = substack[i, :, :, int(layer)].copy() | |
np.fill_diagonal(adj, 0) | |
G = nx.from_numpy_array(adj) | |
threshold = 0.05 | |
G.remove_edges_from([(u, v) for u, v, d in G.edges(data=True) if d['weight'] < threshold]) | |
mapping = dict(zip(G.nodes(), multiline_labels)) | |
G = nx.relabel_nodes(G, mapping) | |
pos = nx.circular_layout(G) | |
node_strength = dict(G.degree(weight='weight')) | |
node_sizes = [min(3000, max(300, node_strength[n] * 1000)) for n in G.nodes()] | |
node_borders = [ | |
'#00ffff' if n in sg_nodes else | |
'#ffff00' if n in gfm_nodes else | |
'#00ff66' if n in gfl_nodes else | |
'#6a0dad' | |
for n in G.nodes() | |
] | |
edge_colors = [] | |
edge_widths = [] | |
edge_font_sizes = {} | |
for u, v, d in G.edges(data=True): | |
w = d['weight'] | |
edge_widths.append(min(8, w * 4)) | |
edge_font_sizes[(u, v)] = min(14, max(9, w * 20)) | |
if u in sg_nodes and v in gfm_nodes: | |
edge_colors.append("#80ff80") | |
elif u in sg_nodes and v in gfl_nodes: | |
edge_colors.append("#66ffcc") | |
elif u in gfm_nodes and v in gfl_nodes: | |
edge_colors.append("#ccff33") | |
elif u in sg_nodes: | |
edge_colors.append("#00ffff") | |
elif u in gfm_nodes: | |
edge_colors.append("#ffff00") | |
elif u in gfl_nodes: | |
edge_colors.append("#00ff66") | |
else: | |
edge_colors.append("#ffffff") | |
node_font_sizes = {n: min(16, max(9, node_strength[n] * 5)) for n in G.nodes()} | |
fig, ax = plt.subplots(figsize=(12, 12)) | |
ax.set_xlim(-1.1, 1.1) | |
ax.set_ylim(-1.1, 1.1) | |
fig.patch.set_facecolor("#111111") | |
ax.set_facecolor("#111111") | |
ax.axis("off") | |
for n in G.nodes(): | |
idx = list(G.nodes()).index(n) | |
nx.draw_networkx_nodes( | |
G, pos, | |
ax=ax, | |
nodelist=[n], | |
node_shape="o", | |
node_color="#111111", | |
edgecolors=[node_borders[idx]], | |
linewidths=2, | |
node_size=[node_sizes[idx]] | |
) | |
ax.text( | |
*pos[n], n, | |
ha='center', va='center', | |
fontsize=node_font_sizes[n], | |
fontweight='bold', | |
color='white', | |
path_effects=[ | |
pe.Stroke(linewidth=1.5, foreground="black"), | |
pe.Normal() | |
] | |
) | |
nx.draw_networkx_edges( | |
G, pos, | |
ax=ax, | |
edge_color=edge_colors, | |
width=edge_widths | |
) | |
edge_labels = {(u, v): f"{G[u][v]['weight']:.2f}" for u, v in G.edges()} | |
for (u, v), label in edge_labels.items(): | |
x = (pos[u][0] + pos[v][0]) / 2 | |
y = (pos[u][1] + pos[v][1]) / 2 + 0.04 | |
ax.text( | |
x, y, label, | |
fontsize=edge_font_sizes[(u, v)], | |
color='white', | |
ha='center', va='center', | |
path_effects=[ | |
pe.Stroke(linewidth=1.5, foreground="black"), | |
pe.Normal() | |
] | |
) | |
t_sec = int(round(i * dt)) | |
label = "(Pre-Event)" if t_sec < 21 else "(Post-Event)" | |
ax.set_title(f"Time: {t_sec} sec {label}", fontsize=18, color='white') | |
frame_path = os.path.join(temp_dir, f"frame_{i:04d}.png") | |
fig.savefig(frame_path, dpi=100, bbox_inches='tight') | |
plt.close(fig) | |
frame_paths.append(frame_path) | |
# Load frames as PIL images | |
images = [Image.open(f).convert("RGB") for f in frame_paths] | |
images[0].save( | |
output_path, | |
save_all=True, | |
append_images=images[1:], | |
duration=500, # milliseconds per frame = 2 sec | |
loop=0 | |
) | |
print(f"✅ GIF saved to {output_path}") | |
return output_path | |
import gradio as gr | |
with gr.Blocks( | |
theme=gr.themes.Base(), | |
css=""" | |
:root { | |
--color-background-primary: #111111 !important; | |
--color-text: #ffffff !important; | |
} | |
body { | |
background-color: #111111 !important; | |
color: #ffffff !important; | |
} | |
.gradio-container, .main, .gr-block { | |
background-color: #111111 !important; | |
color: #ffffff !important; | |
} | |
.title-block { | |
background-color: #1c1c1c; | |
border-radius: 0px; | |
padding: 6px; | |
margin-bottom: 0px; | |
font-size: 96px; | |
} | |
.gr-dropdown { | |
margin-top: 2px !important; | |
background-color: #1c1c1c !important; | |
} | |
.donut-row .gr-box { | |
border: none !important; | |
box-shadow: none !important; | |
background: transparent !important; | |
padding: 0 !important; | |
margin: 0 4px 0 0 !important; | |
} | |
#ix-label { | |
margin: 0 !important; | |
padding: 0 !important; | |
line-height: 0.5; | |
font-size: 13px; | |
background-color: 1c1c1c; | |
} | |
#ix-description { | |
margin: -10px 12px 0 -12px !important; /* top/right/bottom/left */ | |
padding: 0 !important; | |
font-size: 11px; | |
line-height: 1.; | |
color: #888888 !important; /* lighter grey for readability */ | |
background-color: transparent !important; | |
text-align: justify !important; | |
} | |
.auto-toggle .gr-button { | |
background-color: #333 !important; | |
color: white !important; | |
font-weight: bold; | |
border: 1px solid #555 !important; | |
} | |
.auto-on .gr-button { | |
background-color: #00FFFF !important; | |
color: black !important; | |
border: 1px solid #00eaff !important; | |
box-shadow: 0 0 8px #00eaff88; | |
font-weight: bold; | |
} | |
.wrap-button .gr-button { | |
white-space: normal !important; | |
line-height: 1.2; | |
text-align: center; | |
width: 90px !important; | |
font-size: 13px; | |
padding: 6px; | |
} | |
.gr-box { | |
background-color: #1c1c1c !important; | |
border: none !important; | |
} | |
#metric-row { | |
flex-wrap: nowrap !important; | |
gap: 0px; | |
margin-top: 8px; | |
margin-bottom: 12px; | |
} | |
.metric-box { | |
text-align: center; | |
border-right: 1px solid #444; | |
padding: 0 12px; | |
} | |
.metric-box:last-child { | |
border-right: none; | |
} | |
#metric-bar { | |
display: flex; | |
justify-content: space-between; | |
align-items: center; | |
background-color: #1c1c1c; | |
border-radius: 8px; | |
padding: 10px 16px; | |
} | |
.metric-container { | |
display: flex; | |
align-items: center; | |
flex: 1; | |
} | |
.metric-content { | |
flex: 1; | |
text-align: center; | |
} | |
.metric-vline { | |
width: 1px; | |
height: 48px; | |
background-color: #444; | |
margin: 0 12px; | |
} | |
.metric-label { | |
font-size: 11px; | |
color: #aaa; | |
margin-bottom: 2px; | |
} | |
.metric-value { | |
font-size: 26px; | |
font-weight: bold; | |
line-height: 1.1; | |
color: white; | |
} | |
.gr-plot:hover, .gr-image:hover, .gr-textbox:hover { | |
box-shadow: 0 0 12px #00ffff99 !important; | |
transform: scale(1.01); | |
} | |
.gr-button { | |
border-radius: 8px !important; | |
padding: 10px 14px !important; | |
font-weight: bold !important; | |
transition: all 0.2s ease; | |
background-color: #222 !important; | |
color: white !important; | |
border: 1px solid #00ffff66 !important; | |
} | |
.gr-button:hover { | |
background-color: #00ffff22 !important; | |
border-color: #00ffff !important; | |
box-shadow: 0 0 8px #00ffff88; | |
} | |
.title-block { | |
color: #ffffff; | |
padding: 6px 4px; | |
font-size: 42px; | |
font-weight: 700; | |
text-align: center; | |
font-family: "Inter", "Segoe UI", sans-serif; | |
text-transform: none; | |
letter-spacing: 0.5px; | |
margin-bottom: 8px; | |
} | |
.gr-box, .gr-plot, .gr-image, .gr-textbox, .gr-dropdown { | |
background-color: #1c1c1c !important; | |
background: #111111 !important; | |
border: 1px solid #333 !important; | |
border-radius: 10px !important; | |
box-shadow: 0 0 8px #00ffff44 !important; | |
padding: 8px !important; | |
transition: all 0.2s ease-in-out; | |
} | |
.color-0 { color: #FFFFFF !important; } | |
.color-1 { color: #FFFFFF !important; } | |
.color-2 { color: #FFFFFF !important; } | |
.color-3 { color: #FFFFFF !important; } | |
.color-4 { color: #FFFFFF !important; } | |
padding-right: 8px; | |
""" | |
) as demo: | |
with gr.Row(): | |
image_index = gr.State(False) # starts at "distribution.png" | |
auto_predict = gr.State(True) | |
init_donut_values = [4239, 612, 207, 393, 47] | |
init_donut_titles = ["Stable<br>Cases", "Unstable<br>Cases", "Failed<br>Cases", "Voltage<br>Violations", "Oscillatory<br>Unstable Cases"] | |
init_donut_colors = ['#0040FF', '#FF00FF', '#AA00FF', '#00FFAA'] # neon blue, red, purple, cyan | |
# text_colors = ["#0040FF", "#FF00FF", "#AA00FF", "#00FFAA", "#008B8B"] | |
text_colors = ["#FFFFFF", "#FFFFFF", "#FFFFFF", "#FFFFFF", "#FFFFFF"] | |
layer = 0 | |
metric_blocks = [] | |
init_sg, init_gfm, init_gfl = 33, 33, 34 | |
init_ix = find_index(init_sg, init_gfl, init_gfm) | |
ix = init_ix | |
init_folder = "Nine-Bus-Load-Increase-Event" | |
formatted = [f"{v:,}" for v in init_donut_values] | |
display_text = ", ".join(formatted) | |
with gr.Column(scale=1): | |
# Title in grey block | |
gr.Markdown('<div class="title-block">GRID CORTEX</div>') | |
folder_selector = gr.Dropdown( | |
choices=["Nine-Bus-Load-Increase-Event", "Nine-Bus-Short-Circuit-Event"], | |
value="Nine-Bus-Load-Increase-Event", | |
label="Select Dataset Folder", | |
interactive=True | |
) | |
gr.HTML('<div id="ix-description">The time-series data and modal analysis results are located within each folder.</div>') | |
try: | |
image_path = hf_hub_download( | |
repo_id="SevatarOoi/Nine-Bus-Load-Increase-Event", | |
filename="distribution.png", | |
repo_type="dataset", | |
token=os.getenv("DATA_ACCESS"), | |
local_dir="temp_images", | |
local_dir_use_symlinks=False | |
) | |
except Exception as e: | |
print(f"⚠️ Failed to fetch distribution image: {e}") | |
image_path = None | |
distribution_image = gr.Image( | |
label=None, | |
interactive=False, | |
value=image_path, | |
elem_id="floating-distribution" | |
) | |
gr.HTML('<div id="ix-description">The overall distribution of stability in the dataset, in various aspects. Toggle for different views.</div>') | |
folder_selector.change( | |
fn=get_distribution_image, | |
inputs=folder_selector, | |
outputs=distribution_image | |
) | |
toggle_button = gr.Button("Toggle (Takes 3 seconds)") | |
toggle_button.click( | |
fn=cycle_image, | |
inputs=[image_index, folder_selector], | |
outputs=[distribution_image, image_index] | |
) | |
with gr.Column(scale=5): | |
folder_info_box = gr.Markdown(get_folder_info("Nine-Bus-Load-Increase-Event")) | |
# with gr.Row(elem_classes=["donut-row"]): | |
# donut1 = gr.Plot(value=make_donut(init_donut_values[0], init_donut_titles[0], fill_color=init_donut_colors[0]), scale=1, label="", show_label=False) | |
# donut2 = gr.Plot(value=make_donut(init_donut_values[1], init_donut_titles[1], fill_color=init_donut_colors[1]), scale=1, label="", show_label=False) | |
# donut3 = gr.Plot(value=make_donut(init_donut_values[2], init_donut_titles[2], fill_color=init_donut_colors[2]), scale=1, label="", show_label=False) | |
# donut4 = gr.Plot(value=make_donut(init_donut_values[3], init_donut_titles[3], fill_color=init_donut_colors[3]), scale=1, label="", show_label=False) | |
with gr.Row(elem_id="metric-bar"): | |
for i, (value, label) in enumerate(zip(init_donut_values, init_donut_titles)): | |
# Wrap both the metric and optional vertical line into one container | |
html = f""" | |
<div class='metric-container'> | |
<div class='metric-content'> | |
<div class='metric-label'>{label}</div> | |
<div class='metric-value color-{i}'>{value:,}</div> | |
</div> | |
</div> | |
""" | |
metric_blocks.append(gr.HTML(html)) | |
generation_display = gr.Plot(value=generation_plot(init_ix,init_folder), label="", show_label=False) | |
gr.HTML('<div id="ix-description">The dispatches by SG, GFM, and GFL units.</div>') | |
waveform_display = gr.Plot(value=voltage_plot(init_ix,init_folder), label="", show_label=False) | |
gr.HTML('<div id="ix-description">The voltage measurements of buses 4, 6, 8, which are the three buses required for complete observability.</div>') | |
eigenvalue_display = gr.Plot(value=eigenvalue_plot(init_ix,init_folder), label="", show_label=False) | |
gr.HTML('<div id="ix-description">Modal analysis of the system at t = 60 s.</div>') | |
prediction_plot = gr.Plot(value=plot_result(init_ix,init_folder), label="", show_label=False) | |
gr.HTML('<div id="ix-description">Likelihood of the system to destabilize as predicted by the sliding-window DMD workflow.</div>') | |
adj_gif_display = gr.Image(value="adjacency_evolution.gif", label="Adjacency Graph Evolution", interactive=False) | |
gr.HTML('<div id="ix-description">Evolution of the system dynamics calculated using DMD windows. Edge thicknesses are proportional to their values. Node radii are proportional to the sum of conneted edge values.</div>') | |
with gr.Column(scale=1): | |
ix_display = gr.Markdown(f"{init_ix}", elem_id="ix-label") | |
ternary_plot = gr.Plot(value=mpltern_plot(...), label="Ternary Mix Plot") | |
gr.HTML('<div id="ix-description">Location of the current generation mix on the ternary plot.</div>') | |
adj_graph_plot = gr.Plot(value=plot_adjacency_graph(init_ix, init_folder, layer), label="Steady-State Adjacency Graph") | |
gr.HTML('<div id="ix-description">The final system dynamics.</div>') | |
sg = gr.Slider(1, 98, value=init_sg, step=1, label="SG [%]", elem_classes=["square-slider"]) | |
gfm = gr.Slider(1, 98, value=init_gfm, step=1, label="GFM [%]", elem_classes=["square-slider"]) | |
gfl = gr.Slider(1, 98, value=init_gfl, step=1, label="GFL [%]", elem_classes=["square-slider"]) | |
gr.HTML('<div id="ix-description">First adjust the SG penetration, then adjust the GFM penetration. GFM penetration + SG penetration cannot exceed 99%.</div>') | |
with gr.Row(): | |
run_btn = gr.Button("Run Prediction", elem_classes=["wrap-button"],scale=1) | |
auto_btn = gr.Button("Auto-Predict (ON)", elem_classes=["auto-toggle", "auto-on"],scale=1) | |
# adj_button = gr.Button("Show Steady-State Adjacency") | |
# generate_gif_button = gr.Button("Show Adjacency Dynamics") | |
gr.Markdown( | |
""" | |
<div style="padding: 16px; background-color: #1e1e1e; border: 1px solid #333; border-radius: 8px; text-align: center; font-size: 16px;"> | |
<strong>Upcoming!</strong><br>Cortex-GPT for AI-Assisted Data Analysis and Recommendations | |
</div> | |
""" | |
) | |
# chatbot = gr.Chatbot(label="Grid Cortex GPT", type="messages") | |
user_input = gr.Textbox() | |
# chat_state = gr.State([]) | |
# system_prompts = gr.State(system_prompt) | |
# user_input.submit( | |
# fn=chat_with_gpt, | |
# inputs=[system_prompts, chat_state, user_input], | |
# outputs=[chatbot, user_input] | |
# ) | |
sg.release(fn=on_sg_change, inputs=[sg, gfm, gfl, folder_selector], outputs=[sg, gfm, gfl, ix_display, ternary_plot, waveform_display, generation_display, eigenvalue_display]) | |
gfm.release(fn=on_gfm_change, inputs=[sg, gfm, gfl, folder_selector], outputs=[sg, gfm, gfl, ix_display, ternary_plot, waveform_display, generation_display, eigenvalue_display]) | |
gfl.release(fn=on_gfl_change, inputs=[sg, gfm, gfl, folder_selector], outputs=[sg, gfm, gfl, ix_display, ternary_plot, waveform_display, generation_display, eigenvalue_display]) | |
sg.release(fn=maybe_auto_predict, inputs=[sg, gfm, gfl, folder_selector, auto_predict], outputs=prediction_plot) | |
gfm.release(fn=maybe_auto_predict, inputs=[sg, gfm, gfl, folder_selector, auto_predict], outputs=prediction_plot) | |
gfl.release(fn=maybe_auto_predict, inputs=[sg, gfm, gfl, folder_selector, auto_predict], outputs=prediction_plot) | |
sg.release(fn=lambda ix, folder: plot_adjacency_graph(ix, folder, layer=0), inputs=[ix_display, folder_selector], outputs=adj_graph_plot) | |
gfm.release(fn=lambda ix, folder: plot_adjacency_graph(ix, folder, layer=0), inputs=[ix_display, folder_selector], outputs=adj_graph_plot) | |
gfl.release(fn=lambda ix, folder: plot_adjacency_graph(ix, folder, layer=0), inputs=[ix_display, folder_selector], outputs=adj_graph_plot) | |
sg.release(fn=regenerate_adjacency_gif, inputs=[ix_display, folder_selector], outputs=adj_gif_display) | |
gfm.release(fn=regenerate_adjacency_gif, inputs=[ix_display, folder_selector], outputs=adj_gif_display) | |
gfl.release(fn=regenerate_adjacency_gif, inputs=[ix_display, folder_selector], outputs=adj_gif_display) | |
run_btn.click(fn=plot_result, inputs=[ix_display,folder_selector], outputs=[prediction_plot]) | |
folder_selector.change( | |
fn=on_folder_change, | |
inputs=[sg, gfm, gfl, folder_selector], | |
outputs=[ix_display, ternary_plot, waveform_display, generation_display, eigenvalue_display] | |
) | |
folder_selector.change( | |
fn=get_folder_info, | |
inputs=folder_selector, | |
outputs=folder_info_box | |
) | |
# folder_selector.change( | |
# fn=update_donuts_on_folder_change, | |
# inputs=folder_selector, | |
# outputs=[donut1, donut2, donut3, donut4] | |
# ) | |
folder_selector.change( | |
fn=maybe_auto_predict, | |
inputs=[sg, gfm, gfl, folder_selector, auto_predict], | |
outputs=prediction_plot | |
) | |
folder_selector.change( | |
fn=update_metrics_on_folder_change, | |
inputs=[folder_selector], | |
outputs=metric_blocks | |
) | |
folder_selector.change( | |
fn=regenerate_adjacency_gif, | |
inputs=[ix_display, folder_selector], | |
outputs=adj_gif_display | |
) | |
auto_btn.click(fn=toggle_auto_mode, inputs=auto_predict, outputs=[auto_predict,auto_btn]) | |
# llm("System: You are an assistant.\nUser: Hello\nAssistant:", max_tokens=1) | |
demo.launch() | |