import os import glob import json import time import random import re from functools import partial import numpy as np import torch from PIL import Image import gradio as gr from scipy.spatial.distance import cdist from sklearn.metrics import silhouette_samples from sklearn.cluster import KMeans from sklearn.manifold import TSNE import matplotlib.pyplot as plt import matplotlib.cm as cm from argparse import ArgumentParser from datetime import datetime from loguru import logger import io from transformers import CLIPModel, CLIPProcessor # Set environment variable for Gradio os.environ["GRADIO_ALLOWED_PATHS"] = "False" # Global variables to hold state GLOBAL_STATE = { "participant_id": None, "embedding_created": False, "initial_clustering_done": False, "initial_labels_updated": False, "boundary_samples_labeled": False, "embeddings": None, "image_paths": None, "cluster_labels": None, "current_k": 8, "label_dict": {}, "device": torch.device("cuda" if torch.cuda.is_available() else "cpu"), "model": None, "preprocess": None, "constraints": [], "silhouette_scores": None, "boundary_samples": [], "boundary_labels": {}, "boundary_index": 0, "next_cluster_id": None, "representative_images": {}, "random_seed": 42, "original_images": {}, "show_bbox_dict": {}, "hide_bbox_dict": {}, "new_label_inputs": {}, "initial_kmeans_labels": None, "initial_label_dict": None, "kmeans_matched_labels": None, "kmeans_matched_reps": None, "evaluation_choice": None, "evaluation_layout_is_left_hitl": True, "annotation_start_time": None, "annotation_end_time": None, "annotation_duration_seconds": None, } # -------------------------------------------------------------- # Helper functions # -------------------------------------------------------------- def load_bbox_json(bbox_json_path): try: with open(bbox_json_path, 'r', encoding='utf-8') as f: bbox_data = json.load(f) GLOBAL_STATE["show_bbox_dict"] = bbox_data.get("Show", {}) GLOBAL_STATE["hide_bbox_dict"] = bbox_data.get("Hide", {}) except Exception as e: logger.error(f"Failed to load bounding box JSON: {e}") def create_masked_image(image: Image.Image): hide_bbox_dict = GLOBAL_STATE.get("hide_bbox_dict", {}) if not hide_bbox_dict: return image masked_img = image.copy() for _, box_coords in hide_bbox_dict.items(): box = (box_coords['left'], box_coords['top'], box_coords['right'], box_coords['bottom']) black_rectangle = Image.new('RGB', (box[2] - box[0], box[3] - box[1]), color='black') masked_img.paste(black_rectangle, (box[0], box[1])) return masked_img # -------------------------------------------------------------- # CLIP model and Embedding # -------------------------------------------------------------- # ------------------ 絶対に必要な部分 ------------------ def load_clip_model(device="cuda"): hf_model_id = "apple/DFN2B-CLIP-ViT-L-14" model = CLIPModel.from_pretrained(hf_model_id).to(device) model.eval() BASE_PROC = "openai/clip-vit-large-patch14" preprocess = CLIPProcessor.from_pretrained(BASE_PROC) return model, preprocess # ------------------------------------------------------- # def load_clip_model(device="cuda"): # # ローカルのモデルが保存されているパス # local_model_path = "./clip_model_cache" # # # ローカルパスの存在をチェック # if not os.path.isdir(local_model_path): # # エラーメッセージをより具体的にする # error_msg = f"CLIP model not found at the specified local path: {local_model_path}. Please make sure the model is downloaded and placed in the correct directory." # logger.error(error_msg) # raise FileNotFoundError(error_msg) # # logger.info(f"Loading model and processor from local path: {local_model_path}") # # # ローカルパスからモデルとプロセッサを読み込む # model = CLIPModel.from_pretrained(local_model_path).to(device) # preprocess = CLIPProcessor.from_pretrained(local_model_path) # # model.eval() # return model, preprocess def resize_with_padding(img, target_size=(224, 224), fill_color=(0, 0, 0)): img.thumbnail(target_size, Image.Resampling.LANCZOS) new_img = Image.new("RGB", target_size, fill_color) paste_x = (target_size[0] - img.width) // 2 paste_y = (target_size[1] - img.height) // 2 new_img.paste(img, (paste_x, paste_y)) return new_img def get_face_region_embeddings(image: Image.Image, model, preprocess, device="cuda"): show_bbox_dict = GLOBAL_STATE.get("show_bbox_dict", {}) if not show_bbox_dict: logger.warning("No 'Show' regions defined. Using whole image.") processed_image = resize_with_padding(image) inputs = preprocess(images=processed_image, return_tensors="pt").to(device) with torch.no_grad(): emb = model.get_image_features(**inputs) return (emb / emb.norm(dim=-1, keepdim=True)).cpu().numpy().flatten() region_embeddings = [] for region_name, coords in show_bbox_dict.items(): try: box = (coords['left'], coords['top'], coords['right'], coords['bottom']) region_img = image.crop(box) processed_region = resize_with_padding(region_img) inputs = preprocess(images=processed_region, return_tensors="pt").to(device) with torch.no_grad(): emb = model.get_image_features(**inputs) region_embeddings.append(emb / emb.norm(dim=-1, keepdim=True)) except Exception as e: logger.error(f"Failed to process region {region_name}: {e}") continue if not region_embeddings: logger.error("No regions were successfully processed.") return None mean_emb = torch.mean(torch.cat(region_embeddings, dim=0), dim=0, keepdim=True) final_emb = mean_emb / mean_emb.norm(dim=-1, keepdim=True) return final_emb.cpu().numpy().flatten() def create_embeddings(image_folder, bbox_json_path): start_time = time.time() yield "Loading bounding box info..." load_bbox_json(bbox_json_path) if GLOBAL_STATE["model"] is None: yield "Loading CLIP model..." GLOBAL_STATE["model"], GLOBAL_STATE["preprocess"] = load_clip_model(device=GLOBAL_STATE["device"]) yield "Scanning for images..." supported_formats = ['*.jpg', '*.jpeg', '*.png'] image_paths = sorted([p for fmt in supported_formats for p in glob.glob(os.path.join(image_folder, fmt))]) GLOBAL_STATE["image_paths"] = image_paths GLOBAL_STATE["original_images"] = {} embeddings = [] total_images = len(image_paths) yield f"Processing {total_images} images..." for i, img_path in enumerate(image_paths): if i % 10 == 0: yield f"Processing image {i + 1}/{total_images}: {os.path.basename(img_path)}" try: img = Image.open(img_path).convert('RGB') GLOBAL_STATE["original_images"][img_path] = img emb = get_face_region_embeddings(img, GLOBAL_STATE["model"], GLOBAL_STATE["preprocess"], device=GLOBAL_STATE["device"]) if emb is not None: embeddings.append(emb) except Exception as e: logger.error(f"Error processing {img_path}: {e}") if not embeddings: yield "## Failed to create any valid embeddings." return GLOBAL_STATE["embeddings"] = np.vstack(embeddings) GLOBAL_STATE["embedding_created"] = True logger.info(f"Embedding creation took {time.time() - start_time:.2f} seconds.") yield "## Successfully Created Embeddings / 埋め込みの作成に成功しました" # -------------------------------------------------------------- # Clustering and Annotation Logic # -------------------------------------------------------------- # ############################################################ # ### 欠陥3の修正: 代表画像の選択ロジックを修正 ### # ############################################################ def get_representative_images(labels, n_representatives=3): """ Get representative images for each cluster. Prioritizes manually fixed samples, then selects images closest to cluster centers. """ if GLOBAL_STATE["embeddings"] is None: return {} embeddings = GLOBAL_STATE["embeddings"] image_paths = GLOBAL_STATE["image_paths"] # Calculate cluster centers (still needed for non-fixed samples) cluster_centers = {} for label in np.unique(labels): cluster_points = embeddings[labels == label] if len(cluster_points) > 0: cluster_centers[label] = np.mean(cluster_points, axis=0) else: # Fallback for empty clusters cluster_centers[label] = np.zeros(embeddings.shape[1]) cluster_representatives = {} for label in np.unique(labels): cluster_indices = np.where(labels == label)[0] if len(cluster_indices) == 0: cluster_representatives[label] = [] continue # --- Prioritize fixed samples --- fixed_idx_in_cluster = sorted(list(set([ c[1] for c in GLOBAL_STATE.get("constraints", []) if c[0] == "fixed-cluster" and c[2] == label and c[1] in cluster_indices ]))) other_idx_in_cluster = [i for i in cluster_indices if i not in fixed_idx_in_cluster] # --- Prepare the list of representatives --- # Add fixed samples first, with distance 0.0 (as they are definitional) representatives = [(image_paths[i], 0.0, i) for i in fixed_idx_in_cluster] # --- Fill remaining spots with closest samples --- remaining_spots = n_representatives - len(representatives) if remaining_spots > 0 and other_idx_in_cluster: center = cluster_centers[label] other_embeddings = embeddings[other_idx_in_cluster] distances = cdist(other_embeddings, [center]).flatten() # Get the original indices sorted by distance sorted_other_indices_global = [other_idx_in_cluster[i] for i in np.argsort(distances)] for i in range(min(remaining_spots, len(sorted_other_indices_global))): original_idx = sorted_other_indices_global[i] # Find the distance corresponding to this original index dist_idx = np.where(np.array(other_idx_in_cluster) == original_idx)[0][0] distance = distances[dist_idx] representatives.append((image_paths[original_idx], distance, original_idx)) cluster_representatives[label] = representatives return cluster_representatives # ############################################################ # ### 欠陥2の修正: 制約自動生成ロジックをヘルパー関数として追加 ### # ############################################################ def set_representative_samples_as_fixed(representative_images): """ Sets the most representative samples of each cluster as fixed-cluster constraints. This anchors the key samples to their clusters for subsequent constrained clustering. """ new_constraints = [] # For each cluster, set the most representative sample (closest to centroid) as fixed for cluster_id, representatives in representative_images.items(): if not representatives: continue # Get the most representative sample (first in the list, sorted by distance) _, _, sample_idx = representatives[0] # Create a fixed-cluster constraint constraint = ("fixed-cluster", sample_idx, int(cluster_id)) # Check if this constraint already exists if constraint not in GLOBAL_STATE["constraints"]: GLOBAL_STATE["constraints"].append(constraint) new_constraints.append(constraint) logger.info(f"Added fixed constraint for cluster {cluster_id}, sample {sample_idx}") return new_constraints # ############################################################ # ### 欠陥2の修正: 初期クラスタリングに関数を組み込む ### # ############################################################ def perform_initial_clustering(): if GLOBAL_STATE["embeddings"] is None: return "Embeddings not created." k = GLOBAL_STATE["current_k"] kmeans = KMeans(n_clusters=k, random_state=GLOBAL_STATE["random_seed"], n_init=10) cluster_labels = kmeans.fit_predict(GLOBAL_STATE["embeddings"]) GLOBAL_STATE["cluster_labels"] = cluster_labels GLOBAL_STATE["initial_kmeans_labels"] = cluster_labels.copy() GLOBAL_STATE["silhouette_scores"] = silhouette_samples(GLOBAL_STATE["embeddings"], GLOBAL_STATE["cluster_labels"]) GLOBAL_STATE["label_dict"] = {str(i): "" for i in range(k)} GLOBAL_STATE["next_cluster_id"] = k # Get representative images representatives = get_representative_images(GLOBAL_STATE["cluster_labels"], n_representatives=1) GLOBAL_STATE["representative_images"] = representatives # --- 欠陥2の修正: 自動で制約を生成 --- # 既存の制約をクリア GLOBAL_STATE["constraints"] = [] # 代表サンプルを固定する制約を追加 set_representative_samples_as_fixed(representatives) logger.info( f"Automatically generated {len(GLOBAL_STATE['constraints'])} fixed constraints for representative samples.") # --- 修正ここまで --- GLOBAL_STATE["initial_clustering_done"] = True return f"Performed initial clustering with {k} clusters and automatically fixed representative samples. / {k}個のクラスタで初期クラスタリングを実行し、代表サンプルを自動的に固定しました。" def update_initial_labels(*labels): for i in range(GLOBAL_STATE["current_k"]): if not labels[i] or not labels[i].strip(): error_msg = "
Please fill in all cluster labels before proceeding. / 先に進む前に、すべてのクラスタラベルを入力してください。
" return error_msg, gr.update(), gr.update(visible=False), gr.update(interactive=False) for i in range(GLOBAL_STATE["current_k"]): if ',' in labels[i]: error_msg = f"Error in Cluster {i}: Label '{labels[i]}' contains a comma. Please use a single word or phrase. / クラスタ{i}のエラー: ラベル「{labels[i]}」にカンマが含まれています。単一の単語またはフレーズを使用してください。
" return error_msg, gr.update(), gr.update(visible=False), gr.update(interactive=False) for i, label in enumerate(labels): if i < GLOBAL_STATE["current_k"]: GLOBAL_STATE["label_dict"][str(i)] = label.strip() GLOBAL_STATE["initial_label_dict"] = GLOBAL_STATE["label_dict"].copy() GLOBAL_STATE["initial_labels_updated"] = True status = "## Labels updated. / ラベルを更新しました。" done_msg = "初期ラベルを保存しました。このラベルは最後に変更可能です。次の「境界サンプル」タブに進んでください。
Initial labels saved. These can be changed later. Proceed to the 'Boundary Samples' tab.
No other clusters available.
"}New cluster label cannot be empty. / 新規クラスタのラベルは空にできません。
" elif ',' in stripped_label: error_msg = f"Label '{new_label}' contains a comma. Please use a single word or phrase. / ラベル「{new_label}」にカンマが含まれています。単一の単語またはフレーズを使用してください。
" if error_msg: dummy_updates = [gr.update()] * 23 return [error_msg] + dummy_updates + [gr.update(), gr.update()] idx = GLOBAL_STATE["boundary_samples"][GLOBAL_STATE["boundary_index"]] new_cid = GLOBAL_STATE["next_cluster_id"] GLOBAL_STATE["next_cluster_id"] += 1 GLOBAL_STATE["current_k"] += 1 GLOBAL_STATE["label_dict"][str(new_cid)] = stripped_label # --- 既存の制約を削除し、新しい制約を追加 --- GLOBAL_STATE["constraints"] = [c for c in GLOBAL_STATE["constraints"] if not (c[0] == "fixed-cluster" and c[1] == idx)] GLOBAL_STATE["constraints"].append(("fixed-cluster", idx, new_cid)) GLOBAL_STATE["boundary_labels"][idx] = new_cid GLOBAL_STATE["boundary_samples_labeled"] = True # ############################################################ # ### ★★★★★ ここが重要な修正点 ★★★★★ ### # ############################################################ # グローバルなクラスタラベル配列を直接更新する GLOBAL_STATE["cluster_labels"][idx] = new_cid # 更新されたクラスタラベル全体を使って代表画像を再計算する GLOBAL_STATE["representative_images"] = get_representative_images( GLOBAL_STATE["cluster_labels"], n_representatives=1 ) # ############################################################ img, status, _, button_updates = get_current_boundary_sample() return [f"Created new cluster ({new_cid}): {stripped_label}", img, status, ""] + button_updates + [gr.update(), gr.update()] # ############################################################ # ### 欠陥1の修正: COP-KMeansアルゴリズムとヘルパー関数を追加 ### # ############################################################ def kmpp_initialization(dataset, k, fixed_assignments): """KMeans++ initialization, respecting fixed assignments.""" rng = np.random.RandomState(GLOBAL_STATE["random_seed"]) fixed_clusters = {} for idx, cluster in fixed_assignments: if cluster not in fixed_clusters: fixed_clusters[cluster] = [] fixed_clusters[cluster].append(idx) centers = [] remaining_k = k # Initialize centers for clusters with fixed assignments for cluster_id in range(k): if cluster_id in fixed_clusters: centers.append(dataset[fixed_clusters[cluster_id]].mean(axis=0)) remaining_k -= 1 # Fill remaining centers using k-means++ logic if remaining_k > 0: if not centers: first_idx = rng.choice(len(dataset)) centers.append(dataset[first_idx]) remaining_k -= 1 while remaining_k > 0: distances = np.min([np.sum((dataset - center) ** 2, axis=1) for center in centers], axis=0) # Avoid division by zero if all distances are zero if distances.sum() == 0: next_idx = rng.choice(np.where(distances == 0)[0]) else: next_idx = rng.choice(len(dataset), p=distances / distances.sum()) centers.append(dataset[next_idx]) remaining_k -= 1 return np.array(centers) def satisfies_constraints(idx, cluster_id, labels, ml, cl): """Check if assigning point idx to cluster_id satisfies all constraints.""" for i, j in ml: if i == idx and labels[j] != -1 and labels[j] != cluster_id: return False if j == idx and labels[i] != -1 and labels[i] != cluster_id: return False for i, j in cl: if i == idx and labels[j] == cluster_id: return False if j == idx and labels[i] == cluster_id: return False return True def find_nearest_cluster(point, centers, labels, ml, cl, idx): """Find the nearest cluster center that satisfies all constraints.""" distances = np.sum((centers - point) ** 2, axis=1) sorted_idx = np.argsort(distances) for cluster_id in sorted_idx: if satisfies_constraints(idx, cluster_id, labels, ml, cl): return cluster_id return None def cop_kmeans(dataset, k, ml=None, cl=None, fixed_labels=None, max_iter=300, tol=1e-4, anchor_weight=10.0): """Constrained K-means clustering with Must-Link, Cannot-Link, and weighted anchor constraints.""" ml = ml or [] cl = cl or [] fixed_labels = fixed_labels or {} fixed_assignments = list(fixed_labels.items()) cluster_centers = kmpp_initialization(dataset, k, fixed_assignments) cluster_labels = np.full(len(dataset), -1) for idx, cluster in fixed_assignments: cluster_labels[idx] = cluster for iter_count in range(max_iter): old_labels = cluster_labels.copy() violated = False for i in range(len(dataset)): if i in fixed_labels: continue nearest_cluster = find_nearest_cluster(dataset[i], cluster_centers, cluster_labels, ml, cl, i) if nearest_cluster is None: violated = True break cluster_labels[i] = nearest_cluster if violated: logger.warning("Constraints violated, cannot find a valid assignment. Re-initializing might be needed.") return None for j in range(k): idx_in_cluster = np.where(cluster_labels == j)[0] if len(idx_in_cluster) == 0: continue fixed_idx_in_cluster = [i for i in idx_in_cluster if i in fixed_labels and fixed_labels[i] == j] normal_idx_in_cluster = [i for i in idx_in_cluster if i not in fixed_idx_in_cluster] anchor_emb = dataset[fixed_idx_in_cluster] if fixed_idx_in_cluster else np.empty((0, dataset.shape[1])) others_emb = dataset[normal_idx_in_cluster] if normal_idx_in_cluster else np.empty((0, dataset.shape[1])) if len(fixed_idx_in_cluster) > 0: w_anchor = np.full(len(fixed_idx_in_cluster), anchor_weight) w_others = np.ones(len(normal_idx_in_cluster)) all_emb = np.vstack([anchor_emb, others_emb]) w = np.concatenate([w_anchor, w_others])[:, None] cluster_centers[j] = (w * all_emb).sum(axis=0) / w.sum() elif len(normal_idx_in_cluster) > 0: cluster_centers[j] = others_emb.mean(axis=0) if np.all(cluster_labels == old_labels): break return cluster_labels # ############################################################ # ### 欠陥1の修正: 制約付きクラスタリング実行関数を修正 ### # ############################################################ def perform_constrained_clustering(): """Perform constrained clustering based on user-provided constraints.""" if GLOBAL_STATE["embeddings"] is None: return "No embeddings available. Please create embeddings first.", None # Prepare constraints for COP-KMeans ml_constraints = [] cl_constraints = [] fixed_labels = {} for constraint_type, idx1, idx2_or_cluster in GLOBAL_STATE["constraints"]: if constraint_type == "must-link": ml_constraints.append((idx1, idx2_or_cluster)) elif constraint_type == "cannot-link": cl_constraints.append((idx1, idx2_or_cluster)) elif constraint_type == "fixed-cluster": fixed_labels[idx1] = idx2_or_cluster k = GLOBAL_STATE["current_k"] # Run constrained K-means new_labels = cop_kmeans( GLOBAL_STATE["embeddings"], k, ml=ml_constraints, cl=cl_constraints, fixed_labels=fixed_labels ) if new_labels is None: return "Constrained clustering failed. Constraints may be inconsistent.", None GLOBAL_STATE["cluster_labels"] = new_labels GLOBAL_STATE["silhouette_scores"] = silhouette_samples( GLOBAL_STATE["embeddings"], GLOBAL_STATE["cluster_labels"] ) # Update representative images after re-clustering GLOBAL_STATE["representative_images"] = get_representative_images( GLOBAL_STATE["cluster_labels"], n_representatives=1 ) return f"Performed constrained clustering with {k} clusters.", None # -------------------------------------------------------------- # Finalization and Export # -------------------------------------------------------------- class NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() return super(NumpyEncoder, self).default(obj) def update_final_labels(*labels): for i in range(GLOBAL_STATE["current_k"]): # Add a check to ensure the label is not None before processing if labels[i] is None: error_msg = f"Error in Cluster {i}: Final label cannot be empty. Please provide a label. / クラスタ{i}のエラー: 最終ラベルは空にできません。ラベルを入力してください。
" return gr.update(value=error_msg, visible=True), gr.update(visible=False), gr.update(interactive=False) stripped_label = labels[i].strip() if not stripped_label: error_msg = f"Error in Cluster {i}: Final label cannot be empty. / クラスタ{i}のエラー: 最終ラベルは空にできません。
" return gr.update(value=error_msg, visible=True), gr.update(visible=False), gr.update(interactive=False) if ',' in stripped_label: error_msg = f"Error in Cluster {i}: Label '{labels[i]}' contains a comma. Please use a single word or phrase. / クラスタ{i}のエラー: ラベル「{labels[i]}」にカンマが含まれています。単一の単語またはフレーズを使用してください。
" return gr.update(value=error_msg, visible=True), gr.update(visible=False), gr.update(interactive=False) for i, label in enumerate(labels): if i < GLOBAL_STATE["current_k"]: GLOBAL_STATE["label_dict"][str(i)] = label.strip() end_time = time.time() GLOBAL_STATE["annotation_end_time"] = end_time if GLOBAL_STATE.get("annotation_start_time"): duration = end_time - GLOBAL_STATE["annotation_start_time"] GLOBAL_STATE["annotation_duration_seconds"] = duration logger.info(f"Annotation task time: {duration:.2f} seconds.") guidance_message = """Final labels updated. / 最終ラベルを更新しました。
Please take a 10-minute break before proceeding to the next step. After the break, please move to the "Evaluation & Export" tab.
次のステップに進む前に10分間の休憩を取ってください。休憩後、「評価とエクスポート」タブに進んでください。
Participant ID is missing.
" args = get_parameters() pipeline_path = os.path.join(args.output_dir, participant_id) os.makedirs(pipeline_path, exist_ok=True) results_path = os.path.join(args.results_dir, participant_id) os.makedirs(results_path, exist_ok=True) json_filename = f"{args.labels_name}_{participant_id}.json" pipeline_json_path = os.path.join(pipeline_path, json_filename) results_json_path = os.path.join(results_path, json_filename) logger.info(f"Preparing to export results for participant {participant_id}") try: import sklearn import transformers versions = { "torch": torch.__version__, "sklearn": sklearn.__version__, "transformers": transformers.__version__, "gradio": gr.__version__, } except Exception as e: versions = {"error": f"Could not get versions: {e}"} export_data = [] if GLOBAL_STATE.get("image_paths") and GLOBAL_STATE.get("cluster_labels") is not None: initial_kmeans_labels = GLOBAL_STATE.get("initial_kmeans_labels") initial_label_dict = GLOBAL_STATE.get("initial_label_dict", {}) for i, (img_path, cluster_id) in enumerate(zip(GLOBAL_STATE["image_paths"], GLOBAL_STATE["cluster_labels"])): manually_labeled = any(c[0] == "fixed-cluster" and c[1] == i for c in GLOBAL_STATE.get("constraints", [])) initial_cluster_id = None initial_emotion_label = "" if initial_kmeans_labels is not None and i < len(initial_kmeans_labels): initial_cluster_id = int(initial_kmeans_labels[i]) initial_emotion_label = initial_label_dict.get(str(initial_cluster_id), "") export_data.append({ "image_file": os.path.basename(img_path), "image_path": img_path, "cluster_id": int(cluster_id), "emotion_label": GLOBAL_STATE.get("label_dict", {}).get(str(cluster_id), ""), "initial_kmeans_cluster_id": initial_cluster_id, "initial_kmeans_emotion_label": initial_emotion_label, "manually_labeled": manually_labeled, "boundary_sample": i in GLOBAL_STATE.get("boundary_samples", []) }) json_data = { "metadata": { "participant_id": participant_id, "num_clusters_hitl": GLOBAL_STATE.get("current_k"), "num_clusters_initial": len(initial_label_dict) if initial_label_dict else None, "cluster_labels_initial": initial_label_dict, "cluster_labels": GLOBAL_STATE["label_dict"], "constraints": [{"type": c[0], "sample_idx": c[1], "value": c[2]} for c in GLOBAL_STATE.get("constraints", [])], "evaluation_choice": GLOBAL_STATE.get("evaluation_choice"), "evaluation_layout_is_left_hitl": GLOBAL_STATE.get("evaluation_layout_is_left_hitl"), "annotation_duration_seconds": GLOBAL_STATE.get("annotation_duration_seconds"), "objective_scores": { "hitl_silhouette": GLOBAL_STATE.get("hitl_silhouette_score"), "kmeans_matched_silhouette": GLOBAL_STATE.get("kmeans_silhouette_score"), }, "system_info": { "random_seed_used": GLOBAL_STATE.get("random_seed"), "script_parameters": vars(args), "library_versions": versions, }, }, "samples": export_data } try: with open(pipeline_json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2, cls=NumpyEncoder) logger.info(f"Successfully saved pipeline input to: {pipeline_json_path}") with open(results_json_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2, cls=NumpyEncoder) logger.info(f"Successfully saved final result to: {results_json_path}") except Exception as e: logger.error(f"Failed to write export files: {e}") error_msg = f"An error occurred during file export: {e}
" return gr.update(visible=False), error_msg upload_link = "https://drive.google.com/drive/folders/1ZhsVRTs-Tb_-9mxvIyxAb6K2HbnuRPVw?usp=sharing" status_message = f"""エクスポートが完了しました。/ Export complete.
上のボタンからJSONファイルをダウンロードし、指定された場所にアップロードして実験を終了してください。ご協力ありがとうございました。
Please download the JSON file and upload it to the designated location. Thank you for your cooperation.
アップロード先: {upload_link}
アノテーションを続けてください / Please continue annotating:
すべての境界サンプルの確認が完了すると(「Next →」で最後のサンプルに到達すると)、再クラスタリングボタンが表示されます。
The re-clustering button will appear once you have reviewed all boundary samples (by reaching the last sample with "Next →").
注意 / Attention:
以下の「制約を適用して再クラスタリング」ボタンを押す前に,境界サンプルの割当てがすべて正しく行われているかを確認してください。
Before pressing the 'Apply Constrained Clustering' button below, check that all boundary sample assignments are correct.
Invalid Participant ID / 無効な参加者IDです
The ID must be in the format 'P' followed by exactly two digits (e.g., P01, P23).
IDは「P」に続いて数字2桁の形式である必要があります(例: P01, P23)。
Embedding creation complete. Please proceed to the 'Initial Clustering' tab. / 埋め込みの作成が完了しました。「初期クラスタリング」タブに進んでください。
" yield final_status, gr.update(visible=False), gr.update(value=done_msg, visible=True), gr.update( interactive=True) create_emb_btn.click(handle_create_embeddings_flow, [], [emb_status, create_emb_btn, embedding_done_msg, tab_cluster]) def create_cluster_gallery_html(reps, cid, n_images=1): # repsのキーは整数なので、cidを整数に変換 cid = int(cid) if cid not in reps: return "" html = "No new boundary samples found. You can proceed to the next step. / 新しい境界サンプルは見つかりませんでした。次のステップに進んでください。
" return [ gr.update(value=no_samples_msg), gr.update(visible=False), gr.update(visible=False), gr.update(), gr.update(), ] + [gr.update(visible=False)] * 20 + [ gr.update(visible=False), gr.update(visible=False), gr.update(interactive=False), gr.update(interactive=False), ] img, s_status, _, button_updates = get_current_boundary_sample() total_samples = len(GLOBAL_STATE.get("boundary_samples", [])) return [ status, gr.update(visible=True), gr.update(visible=True, value=img), s_status, gr.update(value=""), ] + button_updates + [ gr.update(visible=False), gr.update(visible=True), gr.update(interactive=False), gr.update(interactive=(total_samples > 1)) ] outputs_for_boundary_tab = [ extract_status, boundary_annotation_group, sample_image, sample_status, new_label_text ] + cluster_buttons + [ recluster_group, recluster_guidance, prev_btn, next_btn ] def move_sample(direction): if not GLOBAL_STATE.get("boundary_samples"): return [gr.update()] * len(outputs_for_boundary_tab) total_samples = len(GLOBAL_STATE['boundary_samples']) current_index = GLOBAL_STATE['boundary_index'] if direction == "prev": new_index = max(0, current_index - 1) else: new_index = min(total_samples, current_index + 1) GLOBAL_STATE["boundary_index"] = new_index if new_index == total_samples: completion_message = """To apply your changes, please press the 'Apply Constrained Clustering' button shown below.
すべてのサンプルの確認が完了しました。変更を適用するには、下に表示されている「制約を適用して再クラスタリング」ボタンを押してください。
再クラスタリングが完了しました。「境界サンプルを抽出」ボタンを再度押して、この手順を繰り返してください。
Re-clustering complete. Please press 'Extract Boundary Samples' again to repeat this procedure.
ラベルの変更はありませんでした。次の「最終化」タブに進んでください。
No labels were changed. Proceed to the 'Finalize' tab.
You chose Result {choice} ({final_choice}). Thank you! Please proceed to export your results. / 結果{choice} ({final_choice}) を選択しました。ありがとうございます。結果をエクスポートしてください。
" return gr.update(value=status_msg, visible=True), gr.update(visible=True), gr.update( interactive=False), gr.update(interactive=False) choose_A_btn.click( partial(handle_evaluation_choice, 'A'), [], [evaluation_status, export_group, choose_A_btn, choose_B_btn] ) choose_B_btn.click( partial(handle_evaluation_choice, 'B'), [], [evaluation_status, export_group, choose_A_btn, choose_B_btn] ) export_btn.click(export_results, [participant_id_input], [download_file, export_status]) return app def get_parameters(): parser = ArgumentParser() parser.add_argument("--dataset_dir", type=str, default="./data/lapwing", help="Path to the root directory of the input dataset.") parser.add_argument("--image_dir", type=str, default="images", help="Subdirectory for images within the dataset directory.") parser.add_argument("--text_dir", type=str, default="texts", help="Subdirectory for text files (like bbox) within the dataset directory.") parser.add_argument("--bbox_json", type=str, default="bounding_boxes.json", help="Name of the bounding box JSON file.") parser.add_argument("--output_dir", type=str, default="./experiments", help="Directory to save the experiment results.") parser.add_argument("--labels_name", type=str, default="HitL_results", help="Prefix for the output JSON filename.") parser.add_argument("--results_dir", type=str, default="./results", help="Directory to save the final human evaluation and annotation results.") try: args = parser.parse_args() except SystemExit: args = parser.parse_args([]) args.image_dir = os.path.join(args.dataset_dir, args.image_dir) args.text_dir = os.path.join(args.dataset_dir, args.text_dir) args.bbox_json = os.path.join(args.text_dir, args.bbox_json) return args def main(): set_random_seed() app = create_gradio_interface() app.launch(share=True) def set_random_seed(): seed = GLOBAL_STATE["random_seed"] np.random.seed(seed) torch.manual_seed(seed) random.seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False logger.info(f"Set random seed to {seed}") if __name__ == "__main__": main()