import gradio as gr from huggingface_hub import hf_hub_url, cached_download from matplotlib import cm import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import make_axes_locatable import numpy as np # import onnxruntime as ort from PIL import Image from scipy import special import sys # import timm from types import SimpleNamespace # from transformers import AutoModel, pipeline from transformers import AutoModelForImageClassification, AutoModel, AutoConfig import torch sys.path.insert(1, "../") # from utils import model_utils, train_utils, data_utils, run_utils # from model_utils import jason_regnet_maker, jason_efficientnet_maker from model_utils.efficientnet_config import EfficientNetConfig, EfficientNetPreTrained, EfficientNet model_path = 'chlab/' # model_path = './models/' # plotting a prameters labels = 20 ticks = 14 legends = 14 text = 14 titles = 22 lw = 3 ps = 200 cmap = 'magma' effnet_hparams = {47: {"num_classes": 2, "gamma": 0.04294256770072906, "lr": 0.010208864616781627, "weight_decay": 0.00014537466483781656, "batch_size": 16, "num_channels": 47, "stochastic_depth_prob": 0.017760418815821067, "dropout": 0.039061686292663655, "width_mult": 0.7540060155156922, "depth_mult": 0.9378692812212488, "size": "v2_s", "model_type": "efficientnet_47_planet_detection" }, 61: { "num_classes": 2, "gamma": 0.032606396652426956, "lr": 0.008692971067922545, "weight_decay": 0.00008348389688708425, "batch_size": 23, "num_channels": 61, "stochastic_depth_prob": 0.003581930052432713, "dropout": 0.027804120950575217, "width_mult": 1.060782511229692, "depth_mult": 0.7752918857163054, "size": "v2_s", "model_type": "efficientnet_61_planet_detection" }, 75: { "num_classes": 2, "gamma": 0.029768470449465057, "lr": 0.008383851744497892, "weight_decay": 0.000196304392793202, "batch_size": 32, "num_channels": 75, "stochastic_depth_prob": 0.08398410137077088, "dropout": 0.03351826828687193, "width_mult": 1.144132674734038, "depth_mult": 1.2267023928285563, "size": "v2_s", "model_type": "efficientnet_75_planet_detection" } } # effnet_config = SimpleNamespace(**effnet_hparams) # which layers to look at activation_indices = {'efficientnet': [0, 3]} def normalize_array(x: list): '''Makes array between 0 and 1''' x = np.array(x) return (x - np.min(x)) / np.max(x - np.min(x)) # def load_model(model: str, activation: bool=True): # if activation: # model += '_w_activation' # # set options for onnx runtime # options = ort.SessionOptions() # options.intra_op_num_threads = 1 # options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL # provider = "CPUExecutionProvider" # # start session # ort_session = ort.InferenceSession(model_path + '%s.onnx' % (model), options, providers=[provider]) # # ort_session = ORTModel.load_model(model_path + '%s.onnx' % (model)) # return ort_session def get_activations(model, image: list, model_name: str, layer=None, vmax=2.5, sub_mean=True, channel: int=0): '''Gets activations for a given input image''' # run model # input_name = intermediate_model.get_inputs()[0].name # outputs = intermediate_model.run(None, {input_name: image}) layer_outputs = {} temp_image = image for i in range(len(model.features)): temp_image = model.features[i](temp_image) if i in activation_indices[model_name]: layer_outputs[i] = temp_image # print(i, layer_outputs[i].shape) if i == max(activation_indices[model_name]): break output = model(image).detach().cpu().numpy() # print(model(image), model.model(image)) image = image.detach().cpu().numpy() output_1 = layer_outputs[activation_indices[model_name][0]].detach().cpu().numpy() output_2 = layer_outputs[activation_indices[model_name][1]].detach().cpu().numpy() # print(image.shape, output.shape, output_1.shape, output_2.shape) # get activations # output_1 = outputs[1] # output_2 = outputs[2] # get prediction # output = outputs[0][0] output = special.softmax(output) print(output) # sum over velocity channels if channel == 0: in_image = np.sum(image[0, :, :, :], axis=0) else: image[0, int(channel-1), :, :] in_image = normalize_array(in_image) if layer is None: # sum over all velocity channels activation_1 = np.sum(output_1[0, :, :, :], axis=0) activation_2 = np.sum(output_2[0, :, :, :], axis=0) else: # select a single channel activation_1 = output_1[0, layer, :, :] activation_2 = output_2[0, layer, :, :] if sub_mean: # y = |x - | activation_1 -= np.mean(activation_1) activation_1 = np.abs(activation_1) activation_2 -= np.mean(activation_2) activation_2 = np.abs(activation_2) return output, in_image, activation_1, activation_2 def plot_input(input_image: list, origin='lower'): ##### make the figure for the input image ##### plt.rcParams['xtick.labelsize'] = ticks plt.rcParams['ytick.labelsize'] = ticks input_fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6, 5)) im0 = ax.imshow(input_image, cmap=cmap, origin=origin) divider = make_axes_locatable(ax) cax = divider.append_axes('right', size='5%', pad=0.05) input_fig.colorbar(im0, cax=cax, orientation='vertical') ax.set_title('Input', fontsize=titles) return input_fig def plot_activations(activation_1: list, activation_2: list, origin='lower'): ##### Make the activation figure ###### plt.rcParams['xtick.labelsize'] = ticks plt.rcParams['ytick.labelsize'] = ticks fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(18, 7)) ax1, ax2 = axs[0], axs[1] im1 = ax1.imshow(activation_1, cmap=cmap, origin=origin) im2 = ax2.imshow(activation_2, cmap=cmap, origin=origin) ims = [im1, im2] for (i, ax) in enumerate(axs): divider = make_axes_locatable(ax) cax = divider.append_axes('right', size='5%', pad=0.05) fig.colorbar(ims[i], cax=cax, orientation='vertical') # ax0.set_title('Input', fontsize=titles) ax1.set_title('Early Activation', fontsize=titles) ax2.set_title('Late Activation', fontsize=titles) return fig def predict_and_analyze(model_name, num_channels, dim, input_channel, image): ''' Loads a model with activations, passes through image and shows activations The image must be a numpy array of shape (C, W, W) or (1, C, W, W) ''' model_name = model_name.lower() num_channels = int(num_channels) W = int(dim) print("Running %s for %i channels" % (model_name, num_channels)) print("Loading data") # print(image) image = np.load(image.name, allow_pickle=True) image = image.astype(np.float32) if len(image.shape) != 4: image = image[np.newaxis, :, :, :] image = torch.from_numpy(image) assert image.shape == (1, num_channels, W, W), "Data is the wrong shape" print("Data loaded") print("Loading model") model_loading_name = "%s_%i_planet_detection" % (model_name, num_channels) if 'eff' in model_name: hparams = effnet_hparams[num_channels] hparams = SimpleNamespace(**hparams) config = EfficientNetConfig( dropout=hparams.dropout, num_channels=hparams.num_channels, num_classes=hparams.num_classes, size=hparams.size, stochastic_depth_prob=hparams.stochastic_depth_prob, width_mult=hparams.width_mult, depth_mult=hparams.depth_mult, ) # EfficientNetConfig.model_type = "efficientnet_%s_planet_detection" % (hparams.num_channels) # EfficientNetConfig.model_type = hparams.model_type # config.save_pretrained(save_directory=model_loading_name) # model = EfficientNet(dropout=hparams.dropout, # num_channels=hparams.num_channels, # num_classes=hparams.num_classes, # size=hparams.size, # stochastic_depth_prob=hparams.stochastic_depth_prob, # width_mult=hparams.width_mult, # depth_mult=hparams.depth_mult,) ###### kinda working ##### # AutoConfig.register(model_loading_name, EfficientNetConfig) # AutoModel.register(EfficientNetConfig, EfficientNetPreTrained) # model = AutoModel.from_pretrained(model_path + model_loading_name) # config = EfficientNetConfig.from_pretrained(model_loading_name) # model = EfficientNetPreTrained.from_pretrained(model_loading_name) # model = AutoModel.from_pretrained(model_loading_name, trust_remote_code=True) # model = AutoModel.from_pretrained(model_path + model_loading_name) model = EfficientNet(dropout=hparams.dropout, num_channels=hparams.num_channels, num_classes=hparams.num_classes, size=hparams.size, stochastic_depth_prob=hparams.stochastic_depth_prob, width_mult=hparams.width_mult, depth_mult=hparams.depth_mult,) model_url = cached_download(hf_hub_url(model_path + model_loading_name, filename="pytorch_model.bin")) # print(model_url) loaded = torch.load(model_url, map_location='cpu',) # print(loaded.keys()) model.load_state_dict(loaded['state_dict']) # print(model) # model = EfficientNetPreTrained(config) # config.register_for_auto_class() # model.register_for_auto_class("AutoModelForImageClassification") # pretrained_model = timm.create_model(model_loading_name, pretrained=True) # model.model.load_state_dict(pretrained_model.state_dict()) # pipeline = pipeline(task="image-classification", model=model_loading_name) # model = load_model(model_name, activation=True) # model = AutoModel.from_pretrained(model_loading_name) print("Model loaded") print("Looking at activations") output, input_image, activation_1, activation_2 = get_activations(model, image, model_name, channel=input_channel, sub_mean=True) print("Activations and predictions finished") # print(output) if output[0][0] < output[0][1]: output = 'Planet predicted with %.3f percent confidence' % (100*output[0][1]) else: output = 'No planet predicted with %.3f percent confidence' % (100*output[0][0]) print(output) input_image = normalize_array(input_image) activation_1 = normalize_array(activation_1) activation_2 = normalize_array(activation_2) # convert input image to RGB (unused for now since not outputting actual image) # input_pil_image = Image.fromarray(np.uint8(cm.magma(input_image)*255)) print("Plotting") origin = 'lower' # plot input image input_fig = plot_input(input_image, origin=origin) # plot mean subtracted activations fig1 = plot_activations(activation_1, activation_2, origin=origin) # plot raw activations _, _, activation_1, activation_2 = get_activations(model, image, model_name, channel=input_channel, sub_mean=False) activation_1 = normalize_array(activation_1) activation_2 = normalize_array(activation_2) fig2 = plot_activations(activation_1, activation_2, origin=origin) print("Sending to Hugging Face") return output, input_fig, fig1, fig2 if __name__ == "__main__": demo = gr.Interface( fn=predict_and_analyze, inputs=[gr.Dropdown(["EfficientNet"], # "RegNet"], value="EfficientNet", label="Model Selection", show_label=True), gr.Dropdown(["47", "61", "75"], value="61", label="Number of Velocity Channels", show_label=True), gr.Dropdown(["600"], value="600", label="Image Dimensions", show_label=True), gr.Number(value=0., label="Input Channel to show (0 = sum over all)", show_label=True), gr.File(label="Input Data", show_label=True)], outputs=[gr.Textbox(lines=1, label="Prediction", show_label=True), # gr.Image(label="Input Image", show_label=True), gr.Plot(label="Input Image", show_label=True), gr.Plot(label="Mean-Subtracted Activations", show_label=True), gr.Plot(label="Raw Activations", show_label=True) ], title="Kinematic Planet Detector" ) demo.launch()