Spaces:
Runtime error
Runtime error
import glob | |
import json | |
import math | |
import os | |
from dataclasses import dataclass | |
from src.display.formatting import make_clickable_model | |
from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision, WeightType | |
from src.submission.check_validity import is_model_on_hub | |
from src.evaluation.model_trace_eval import compute_model_trace_p_value | |
from src.evaluation.initialize_models import is_model_allowed | |
class EvalResult: | |
"""Represents one perplexity evaluation result.""" | |
eval_name: str # org_model_precision (uid) | |
full_model: str # org/model (path on hub) | |
org: str | |
model: str | |
revision: str # commit hash, "" if main | |
results: dict | |
precision: Precision = Precision.Unknown | |
model_type: ModelType = ModelType.PT # Default to pretrained | |
weight_type: WeightType = WeightType.Original | |
architecture: str = "Unknown" | |
still_on_hub: bool = False | |
def init_from_json_file(self, json_filepath): | |
"""Inits the result from the specific model result file""" | |
with open(json_filepath) as fp: | |
data = json.load(fp) | |
config = data.get("config") | |
# Precision | |
precision = Precision.from_str(config.get("model_dtype")) | |
# Get model and org | |
org_and_model = config.get("model_name", config.get("model_args", None)) | |
org_and_model = org_and_model.split("/", 1) | |
if len(org_and_model) == 1: | |
org = None | |
model = org_and_model[0] | |
result_key = f"{model}_{precision.value.name}" | |
else: | |
org = org_and_model[0] | |
model = org_and_model[1] | |
result_key = f"{org}_{model}_{precision.value.name}" | |
full_model = "/".join(org_and_model) | |
still_on_hub, _, model_config = is_model_on_hub( | |
full_model, config.get("model_sha", "main"), trust_remote_code=True, test_tokenizer=False | |
) | |
architecture = "?" | |
if model_config is not None: | |
architectures = getattr(model_config, "architectures", None) | |
if architectures: | |
architecture = ";".join(architectures) | |
# No perplexity extraction - we only care about p-values | |
results = {} | |
return self( | |
eval_name=result_key, | |
full_model=full_model, | |
org=org, | |
model=model, | |
results=results, | |
precision=precision, | |
revision=config.get("model_sha", ""), | |
still_on_hub=still_on_hub, | |
architecture=architecture | |
) | |
def to_dict(self): | |
"""Converts the Eval Result to a dict compatible with our dataframe display - P-VALUES ONLY""" | |
import sys | |
sys.stderr.write(f"\n=== PROCESSING RESULT TO_DICT (P-VALUES ONLY) ===\n") | |
sys.stderr.write(f"Processing result for model: {self.full_model}\n") | |
sys.stderr.flush() | |
# Create data dictionary - NO TASK PROCESSING AT ALL | |
data_dict = {} | |
# Add core columns | |
data_dict["eval_name"] = self.eval_name | |
data_dict[AutoEvalColumn.precision.name] = self.precision.value.name | |
data_dict[AutoEvalColumn.model_type.name] = self.model_type.value.name | |
data_dict[AutoEvalColumn.model_type_symbol.name] = self.model_type.value.symbol | |
data_dict[AutoEvalColumn.weight_type.name] = self.weight_type.value.name | |
data_dict[AutoEvalColumn.architecture.name] = self.architecture | |
data_dict[AutoEvalColumn.model.name] = make_clickable_model(self.full_model) | |
data_dict[AutoEvalColumn.revision.name] = self.revision | |
data_dict[AutoEvalColumn.still_on_hub.name] = self.still_on_hub | |
# Add default values for missing model info | |
data_dict[AutoEvalColumn.license.name] = "Unknown" | |
data_dict[AutoEvalColumn.params.name] = 0 | |
data_dict[AutoEvalColumn.likes.name] = 0 | |
# Compute model trace p-value | |
sys.stderr.write(f"🧬 COMPUTING MODEL TRACE P-VALUE FOR: {self.full_model}\n") | |
sys.stderr.flush() | |
try: | |
model_trace_p_value = compute_model_trace_p_value( | |
self.full_model, | |
self.revision if self.revision else "main", | |
self.precision.value.name.lower() | |
) | |
if model_trace_p_value is not None: | |
sys.stderr.write(f"✅ P-value: {model_trace_p_value}\n") | |
else: | |
sys.stderr.write(f"⚠️ P-value computation failed\n") | |
except Exception as e: | |
sys.stderr.write(f"💥 Exception during p-value computation: {e}\n") | |
model_trace_p_value = None | |
data_dict[AutoEvalColumn.model_trace_p_value.name] = model_trace_p_value | |
sys.stderr.write(f"=== END PROCESSING - ONLY P-VALUES ===\n") | |
sys.stderr.flush() | |
return data_dict | |
def get_raw_eval_results(results_path: str) -> list[EvalResult]: | |
"""From the path of the results folder root, extract all perplexity results""" | |
import sys | |
sys.stderr.write(f"\nSearching for result files in: {results_path}\n") | |
sys.stderr.flush() | |
model_result_filepaths = [] | |
for root, _, files in os.walk(results_path): | |
# Process all JSON files, regardless of other files in the directory | |
for file in files: | |
if file.endswith(".json"): | |
model_result_filepaths.append(os.path.join(root, file)) | |
sys.stderr.write(f"Found {len(model_result_filepaths)} result files\n") | |
sys.stderr.flush() | |
eval_results = {} | |
for model_result_filepath in model_result_filepaths: | |
try: | |
sys.stderr.write(f"\nProcessing file: {model_result_filepath}\n") | |
sys.stderr.flush() | |
# Quick pre-check: Try to extract model name from file before full processing | |
try: | |
with open(model_result_filepath, 'r') as f: | |
data = json.load(f) | |
config = data.get("config", {}) | |
model_name = config.get("model_name", "") | |
if model_name and not is_model_allowed(model_name): | |
sys.stderr.write(f"⏭️ Skipping non-allowed model file: {model_result_filepath} (model: {model_name})\n") | |
sys.stderr.flush() | |
continue | |
except Exception as e: | |
sys.stderr.write(f"⚠️ Error pre-checking file {model_result_filepath}: {e}\n") | |
sys.stderr.flush() | |
continue | |
# Creation of result | |
eval_result = EvalResult.init_from_json_file(model_result_filepath) | |
sys.stderr.write(f"Created result object for: {eval_result.full_model}\n") | |
sys.stderr.flush() | |
# Store results of same eval together | |
eval_name = eval_result.eval_name | |
if eval_name in eval_results.keys(): | |
eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None}) | |
sys.stderr.write(f"Updated existing result for {eval_name}\n") | |
sys.stderr.flush() | |
else: | |
eval_results[eval_name] = eval_result | |
sys.stderr.write(f"Added new result for {eval_name}\n") | |
sys.stderr.flush() | |
except Exception as e: | |
sys.stderr.write(f"Error processing result file {model_result_filepath}: {e}\n") | |
import traceback | |
sys.stderr.write(f"Traceback: {traceback.format_exc()}\n") | |
sys.stderr.flush() | |
continue | |
results = [] | |
sys.stderr.write(f"\nProcessing {len(eval_results)} evaluation results\n") | |
sys.stderr.flush() | |
for v in eval_results.values(): | |
try: | |
sys.stderr.write(f"\nConverting result to dict for: {v.full_model}\n") | |
sys.stderr.flush() | |
# Filter to only allowed models | |
if not is_model_allowed(v.full_model): | |
sys.stderr.write(f"⏭️ Skipping non-allowed model: {v.full_model}\n") | |
sys.stderr.flush() | |
continue | |
v.to_dict() # we test if the dict version is complete | |
results.append(v) | |
sys.stderr.write("Successfully converted and added result\n") | |
sys.stderr.flush() | |
except KeyError as e: | |
sys.stderr.write(f"Error converting result to dict: {e}\n") | |
import traceback | |
sys.stderr.write(f"Traceback: {traceback.format_exc()}\n") | |
sys.stderr.flush() | |
continue | |
sys.stderr.write(f"\nReturning {len(results)} processed results\n") | |
sys.stderr.flush() | |
return results | |