|
import json |
|
import os |
|
|
|
from datetime import datetime, timezone |
|
from typing import List |
|
import tempfile |
|
|
|
from src.display.formatting import styled_message |
|
from src.envs import API, EVAL_RESULTS_PATH, RESULTS_REPO |
|
|
|
import threading |
|
import queue |
|
|
|
import gradio as gr |
|
|
|
import numpy as np |
|
from sklearn.metrics import confusion_matrix |
|
|
|
|
|
GROUND_TRUTH_DATA = None |
|
TASKS_QUEUE = queue.Queue() |
|
|
|
|
|
|
|
remap_dict = { |
|
0: 0, 1: 0, 2: 0, 3: 0, 4: 0, |
|
5: 1, |
|
6: 2, 7: 2, |
|
8: 3, 9: 3, 10: 3, 11: 3, |
|
14: 4, |
|
15: 5, |
|
16: 6, |
|
17: 7, 18: 7, |
|
19: 8, |
|
20: 9, |
|
21: 10, |
|
12: 255, 13: 255, 255: 255 |
|
} |
|
|
|
|
|
|
|
lut = np.full(256, 255, dtype=np.uint8) |
|
for k, v in remap_dict.items(): |
|
lut[k] = v |
|
|
|
|
|
def queue_worker(): |
|
global TASKS_QUEUE |
|
print("queue_worker") |
|
while True: |
|
|
|
print("Wait data from queue") |
|
las_files, user_name, result_name, current_time, remap = TASKS_QUEUE.get() |
|
print(f"Compute stats {user_name} {result_name} {current_time}") |
|
|
|
metrics = eval_las_files(las_files, remap) |
|
|
|
eval_entry = { |
|
"result_name": result_name, |
|
"submitted_time": current_time, |
|
} | metrics |
|
|
|
print("Creating eval file") |
|
out_dir = f"{EVAL_RESULTS_PATH}/{user_name}" |
|
os.makedirs(out_dir, exist_ok=True) |
|
out_path = f"{out_dir}/{result_name}_result.json" |
|
|
|
print(out_path) |
|
with open(out_path, "w") as f: |
|
f.write(json.dumps(eval_entry)) |
|
|
|
print("Uploading eval file") |
|
print(out_path.split("eval-results/")[1]) |
|
API.upload_file( |
|
path_or_fileobj=out_path, |
|
path_in_repo=out_path.split("eval-results/")[1], |
|
repo_id=RESULTS_REPO, |
|
repo_type="dataset", |
|
commit_message=f"Add {user_name}/{result_name} to result", |
|
) |
|
|
|
|
|
TASKS_QUEUE.task_done() |
|
|
|
|
|
WORKER_THREAD = threading.Thread(target=queue_worker, daemon=True) |
|
WORKER_THREAD.start() |
|
|
|
|
|
def handle_file_path(path: str) -> str: |
|
basename = os.path.basename(path) |
|
name_wo_ext = os.path.splitext(basename)[0] |
|
return name_wo_ext |
|
|
|
|
|
def read_ground_truth(): |
|
print("read_ground_truth") |
|
global GROUND_TRUTH_DATA |
|
GROUND_TRUTH_DATA = {} |
|
directory_path = os.path.join(EVAL_RESULTS_PATH, "ground_truth") |
|
|
|
for filename in os.listdir(directory_path): |
|
if filename.endswith(".npz"): |
|
file_path = os.path.join(directory_path, filename) |
|
|
|
with np.load(file_path) as data: |
|
|
|
GROUND_TRUTH_DATA[os.path.splitext(filename)[0]] = lut[data["data"]] |
|
|
|
def compute_metrics_from_cm(cm: np.ndarray): |
|
tp = np.diag(cm) |
|
fp = np.sum(cm, axis=0) - tp |
|
fn = np.sum(cm, axis=1) - tp |
|
|
|
|
|
total = np.sum(cm) |
|
accuracy = np.sum(tp) / total if total != 0 else 0.0 |
|
|
|
|
|
precision = np.divide(tp, tp + fp, out=np.zeros_like(tp, dtype=np.float64), where=(tp + fp) != 0) |
|
recall = np.divide(tp, tp + fn, out=np.zeros_like(tp, dtype=np.float64), where=(tp + fn) != 0) |
|
f1 = np.divide(2 * precision * recall, precision + recall, out=np.zeros_like(tp, dtype=np.float64), where=(precision + recall) != 0) |
|
|
|
|
|
precision_macro = np.mean(precision) |
|
recall_macro = np.mean(recall) |
|
f1_macro = np.mean(f1) |
|
|
|
|
|
denom = tp + fp + fn |
|
iou = np.divide(tp, denom, out=np.zeros_like(tp, dtype=np.float64), where=denom != 0) |
|
miou = np.mean(iou) |
|
|
|
|
|
row_sums = cm.sum(axis=1) |
|
col_sums = cm.sum(axis=0) |
|
|
|
pa = np.divide(tp, row_sums, out=np.zeros_like(tp, dtype=np.float64), where=row_sums != 0) |
|
|
|
|
|
ua = np.divide(tp, col_sums, out=np.zeros_like(tp, dtype=np.float64), where=col_sums != 0) |
|
|
|
|
|
return { |
|
"accuracy": accuracy, |
|
"precision_macro": precision_macro, |
|
"recall_macro": recall_macro, |
|
"f1_macro": f1_macro, |
|
"precision_per_class": precision.tolist(), |
|
"recall_per_class": recall.tolist(), |
|
"f1_per_class": f1.tolist(), |
|
"iou": iou.tolist(), |
|
"miou": miou, |
|
"producer_accuracy": pa.tolist(), |
|
"user_accuracy": ua.tolist(), |
|
"confusion_matrix": cm.tolist(), |
|
} |
|
|
|
|
|
def eval_las_files(npz_file_paths: List[str], remap=False): |
|
global GROUND_TRUTH_DATA |
|
NUM_CLASSES = 11 |
|
LABELS = list(range(NUM_CLASSES)) |
|
global_cm = np.zeros((NUM_CLASSES, NUM_CLASSES), dtype=np.int64) |
|
|
|
if GROUND_TRUTH_DATA is None: |
|
read_ground_truth() |
|
for file_path in npz_file_paths: |
|
print("Reading file:", file_path) |
|
area = handle_file_path(file_path) |
|
if area not in GROUND_TRUTH_DATA: |
|
print(f"Error {area} is not a known area !") |
|
continue |
|
|
|
with np.load(file_path) as data: |
|
y_pred = data["data"] |
|
if remap: |
|
y_pred = lut[y_pred] |
|
y_true = GROUND_TRUTH_DATA[area] |
|
if y_true.shape != y_pred.shape: |
|
print(f"Error {area} pred and gt with different shape {y_true.shape=} {y_pred.shape} !") |
|
continue |
|
|
|
valid = y_true != 255 |
|
|
|
cm = confusion_matrix(y_true[valid], y_pred[valid], labels=LABELS) |
|
global_cm += cm |
|
|
|
return compute_metrics_from_cm(global_cm) |
|
|
|
|
|
def add_new_eval( |
|
|
|
result_name: str, |
|
npz_files: List[tempfile._TemporaryFileWrapper], |
|
remap: bool, |
|
profile: gr.OAuthProfile | None |
|
) -> str: |
|
global TASKS_QUEUE |
|
|
|
if profile is None: |
|
return styled_message("⚠️ Please log in to submit your evaluation.") |
|
|
|
if not result_name or not npz_files: |
|
return styled_message("❌ Please fill in all fields and upload at least one NPZ file.") |
|
|
|
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") |
|
print("Adding new eval in tasks queue") |
|
|
|
|
|
saved_files = [] |
|
for file in npz_files: |
|
new_path = os.path.join(tempfile.gettempdir(), os.path.basename(file.name)) |
|
with open(new_path, 'wb') as out_file, open(file.name, 'rb') as in_file: |
|
out_file.write(in_file.read()) |
|
saved_files.append(new_path) |
|
|
|
TASKS_QUEUE.put((saved_files, profile.username, result_name, current_time, remap)) |
|
|
|
return styled_message("✅ Your request has been added! The leaderboard will update after processing.") |
|
|
|
|