Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import os, sys | |
| import pandas as pd | |
| import pickle | |
| import cv2 | |
| import random | |
| from glob import glob | |
| from tqdm import tqdm | |
| from sklearn import svm | |
| import torch | |
| # from IPython.display import Video | |
| # from IPython.display import HTML | |
| # from ipywidgets import Output, GridspecLayout | |
| # from IPython import display | |
| import gradio as gr | |
| def load_feats(fpath): | |
| with open(fpath, 'rb') as f: | |
| features = pickle.load(f) | |
| print("Features: ", features.shape) | |
| x_raw = features[:,[0]] | |
| x=[] | |
| for feat in x_raw: | |
| x.append(feat[0]) | |
| x = np.array(x).astype(float) | |
| y_raw = features[:, [1]] | |
| y = [] | |
| for label in y_raw: | |
| y.append(int(label[0])) | |
| y = np.array(y) | |
| files_raw = features[:, [2]] | |
| files = [] | |
| for f in files_raw: | |
| files.append(f[0]) | |
| files = np.array(files) | |
| data_raw = features[:, [-1]] | |
| data = [] | |
| for d in data_raw: | |
| data.append(d[0]) | |
| return x, y, files, data | |
| def get_binary_labels_exemplar(labels, target_label=0): | |
| y = [] | |
| for lab in labels: | |
| if lab==target_label: | |
| y.append(1) | |
| else: | |
| y.append(-1) | |
| return np.array(y) | |
| def find_ind(target_word, labels, word_map): | |
| # print("Target word: ", target_word) | |
| # print("Labels: ", labels) | |
| indices = [] | |
| target_label = word_map[target_word] | |
| # print("Target label: ", target_label) | |
| for i in range(len(labels)): | |
| label = int(labels[i]) | |
| # print("Curr label: ", label) | |
| if label == target_label: | |
| indices.append(i) | |
| # print("All label indices: ", indices) | |
| return indices | |
| def get_scores_sklearn(x, clf): | |
| w=clf.coef_ | |
| scores=[] | |
| for i in range(len(x)): | |
| feat = x[i].astype(float).reshape(len(x[i]),1) | |
| sc = np.dot(w, np.array(feat, dtype=float))[0][0] | |
| scores.append(sc) | |
| return scores | |
| def get_top_labels(scores, labels, files): | |
| # Sort scores by decreasing scores | |
| _, perm = torch.sort(scores, descending=True) | |
| all_classes = labels.astype(int) | |
| all_files = files | |
| # ranked_classes = [all_classes[i][0] for i in perm] | |
| ranked_files = [(all_files[i], all_classes[i]) for i in perm] | |
| return ranked_files | |
| def plot_ranked_videos(all_labels, scores, data, key_list, val_list, row=2, col=3, data_path="videos/", fps=15, target_label=0): | |
| _, perm = torch.sort(scores, descending=True) | |
| perm=perm.numpy() | |
| html = """<div> | |
| <b> <font size=5> Top retrieved files: </font> </b> <br> | |
| <table> | |
| """ | |
| idx=0 | |
| for r in range(row): | |
| html+="<tr>" | |
| for c in range(col): | |
| label = all_labels[perm[idx]] | |
| word = key_list[val_list[label]] | |
| vid_file = os.path.join("/file="+data_path, data[perm[idx]].speaker, data[perm[idx]].file+".mp4") | |
| start_frame, end_frame = data[perm[idx]].start_frame, data[perm[idx]].end_frame | |
| start_time, end_time = start_frame/fps, end_frame/fps | |
| vid_src = "{}#t={},{}".format(vid_file, start_time-1, end_time+1) | |
| color="green" if label==target_label else "red" | |
| fname = data[perm[idx]].file | |
| html += ("""<td><center> <b><font color=%s size=5>%s</font></b> (%.1f - %.1f seconds)</center> | |
| <video width="320" height="240" controls> | |
| <source src="%s" type="video/mp4"> | |
| </video> | |
| <center> (%s - %s) </center> | |
| </td>""" % (color, word, start_time, end_time, vid_src, perm[idx], fname)) | |
| idx+=1 | |
| if c==col-1: | |
| html+="</tr>" | |
| html+="""</table> | |
| </div>""" | |
| return html | |
| def plot_query(query_idx, data, key_list, val_list, data_path="videos/", fps=15, target_label=0): | |
| word = key_list[val_list[target_label]] | |
| vid_file = os.path.join("/file="+data_path, data[query_idx].speaker, data[query_idx].file+".mp4") | |
| start_frame, end_frame = data[query_idx].start_frame, data[query_idx].end_frame | |
| start_time, end_time = start_frame/fps, end_frame/fps | |
| vid_src = "{}#t={},{}".format(vid_file, start_time-1, end_time+1) | |
| color="green" | |
| fname = data[query_idx].file | |
| plot = (""" <b> <font size=5> Positive sample used to train Exemplar-SVM: </font> </b> <br> Query index in the data: %d <br> | |
| <b><font color=%s size=5>%s</font></b> (%.1f - %.1f seconds) <br> | |
| <video width="320" height="240" controls> | |
| <source src="%s" type="video/mp4"> | |
| </video> <br> | |
| (%s) | |
| """ % (query_idx, color, word, start_time, end_time, vid_src, fname)) | |
| return plot | |
| def retrieve(file_choice, target_word, query_idx, rows=5, cols=4): | |
| print(file_choice) | |
| if file_choice=="big_little": | |
| file="features/gestsync_feats_biglittle_train.pkl" | |
| word_map={"big":0, "little":1} | |
| print("Target word: ", target_word) | |
| if target_word not in list(word_map.keys()): | |
| msg="The selected target word is not present in the word classes. For '{}' word classes, please select the target word from the following list: {}".format(file_choice, list(word_map.keys())) | |
| return msg, "" | |
| target_label=word_map[target_word] | |
| elif file_choice=="5_words": | |
| file="features/gestsync_feats_5words_balanced_train.pkl" | |
| word_map = {'big': 0, 'little': 1, 'next': 2, 'i': 3, 'you': 4} | |
| print("Target word: ", target_word) | |
| if target_word not in list(word_map.keys()): | |
| msg="The selected target word is not present in the word classes. For '{}' word classes, please select the target word from the following list: {}".format(file_choice, list(word_map.keys())) | |
| return msg, "" | |
| target_label=word_map[target_word] | |
| print("Target label: ", target_label) | |
| key_list = list(word_map.keys()) | |
| val_list = list(word_map.values()) | |
| x_train, y_train_actual, files_train, data_rows = load_feats(file) | |
| print("X train: {} | Y train: {} | Files train: {} | Data rows test: {}".format(x_train.shape, y_train_actual.shape, files_train.shape, len(data_rows))) | |
| print("Query idx orig: ", query_idx) | |
| pos_indices = find_ind(target_word, y_train_actual, word_map) | |
| if query_idx=="Random": | |
| query_idx = random.choice(pos_indices) | |
| elif str(query_idx).isnumeric(): | |
| query_idx = int(query_idx) | |
| if query_idx not in pos_indices: | |
| msg = "The input query index selected ({}) is not the right index! Please select an index of the positive sample to include in Exemplar-SVM training OR type 'Random' to randomly chose an index. <br> For the selected target word '{}', the following are the positive indices in the dataset that can be selected: <br> {}".format(query_idx, target_word, pos_indices) | |
| return msg, "" | |
| else: | |
| msg = "The input query index selected ({}) is not the right index! Please select an index of the positive sample to include in Exemplar-SVM training OR type 'Random' to randomly chose an index. <br> For the selected target word '{}', the following are the positive indices in the dataset that can be selected: <br> {}".format(query_idx, target_word, pos_indices) | |
| return msg, "" | |
| print("Query idx: ", query_idx) | |
| y_train = get_binary_labels_exemplar(y_train_actual, target_label) | |
| svm_x_train = [] | |
| svm_y_train = [] | |
| svm_data_rows = [] | |
| for i in range(len(x_train)): | |
| if y_train[i]==-1: | |
| svm_x_train.append(x_train[i]) | |
| svm_y_train.append(y_train[i]) | |
| svm_data_rows.append(data_rows[i]) | |
| if i==query_idx: | |
| print(query_idx, y_train[i]) | |
| svm_x_train.append(x_train[i]) | |
| svm_y_train.append(y_train[i]) | |
| svm_data_rows.append(data_rows[i]) | |
| svm_x_train = np.array(svm_x_train).astype(float) | |
| svm_y_train = np.array(svm_y_train).astype(float) | |
| print("SVM dataset - X: {} | Y: {}".format(svm_x_train.shape,svm_y_train.shape)) | |
| clf = svm.SVC(C=1, kernel="linear") | |
| clf.fit(svm_x_train, svm_y_train) | |
| scores = get_scores_sklearn(x_train, clf) | |
| # print(scores) | |
| query_plot = plot_query(query_idx, data_rows, key_list, val_list, target_label=target_label) | |
| retrieved_plots = plot_ranked_videos(y_train_actual, torch.tensor(scores), data_rows, key_list, val_list, row=rows, col=cols, target_label=target_label) | |
| return query_plot, retrieved_plots | |
| if __name__ == "__main__": | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| gr.HTML("<b> <font size=5> <center> Welcome to Exemplar-SVM Training and Visualization of top-k videos </center> </font> </b>") | |
| with gr.Row(): | |
| features = gr.Dropdown(["big_little", "5_words"], label="Word classes", value="big_little") | |
| word = gr.Radio(["big", "little", "i", "you", "next"], label="Target word", value="big") | |
| query_idx = gr.Textbox(value="Random", label="Positive sample index to train Exemplar-SVM (Type 'Random' to randomly select the index)") | |
| with gr.Row(): | |
| submit = gr.Button("Retrieve") | |
| with gr.Row(): | |
| query = gr.HTML() | |
| with gr.Row(): | |
| ret_videos = gr.HTML() | |
| submit.click(retrieve, inputs=[features, word, query_idx], outputs=[query, ret_videos]) | |
| demo.launch(allowed_paths=["."]) |