Mezura / src /utils.py
nmmursit's picture
Fixed text wrapping in tables & Added Contact Info
1326dcc verified
import pandas as pd
import json
import os
import glob
import gradio as gr
import traceback
import re
import plotly.express as px
import plotly.graph_objects as go
from src.envs import API, TOKEN, REPO_ID
import requests
import logging
from datetime import datetime
from dotenv import load_dotenv
from utils.rag_score_calculator import RAGScoreCalculator
# Logger setup
logger = logging.getLogger("mezura.utils")
# Setup a dedicated logger for tracking model submissions
submission_logger = logging.getLogger("mezura.submissions")
submission_handler = logging.FileHandler("submissions.log")
submission_formatter = logging.Formatter('%(asctime)s - %(message)s')
submission_handler.setFormatter(submission_formatter)
submission_logger.addHandler(submission_handler)
submission_logger.setLevel(logging.INFO)
# Model metadata lookup table - centralized for all benchmark functions
MODEL_METADATA_LOOKUP = {
"mistralai/Magistral-Small-2506": {"license": "Apache 2.0", "dtype": "bfloat16"},
"newmindai/Qwen2.5-72B-Instruct": {"license": "Qwen", "dtype": "bfloat16"},
"Qwen/Qwen2.5-72B-Instruct": {"license": "Qwen", "dtype": "bfloat16"},
"deepseek-ai/DeepSeek-R1": {"license": "MIT", "dtype": "bfloat16"},
"Qwen/Qwen3-32B": {"license": "Qwen", "dtype": "bfloat16"},
"newmindai/QwQ-32B-r1": {"license": "Apache 2.0", "dtype": "bfloat16"},
"google/gemma-3-27b-it": {"license": "Gemma", "dtype": "bfloat16"},
"Qwen/Qwen3-14B": {"license": "Apache 2.0", "dtype": "bfloat16"},
"newmindai/Llama-3.3-70b-Instruct": {"license": "Llama-3.3", "dtype": "bfloat16"},
"Qwen/QwQ-32B": {"license": "Apache 2.0", "dtype": "bfloat16"},
"microsoft/phi-4": {"license": "MIT", "dtype": "bfloat16"},
"meta-llama/Meta-Llama-3.1-70B-Instruct": {"license": "Llama 3.1", "dtype": "bfloat16"},
"grok-3": {"license": "Proprietary", "dtype": "Unknown"},
"grok-3-mini-fast": {"license": "Proprietary", "dtype": "Unknown"},
"meta-llama/Llama-3.3-70B-Instruct": {"license": "Llama-3.3", "dtype": "bfloat16"},
"meta-llama/Llama-3.3-70b-Instruct": {"license": "Llama 3.3", "dtype": "bfloat16"}, # lowercase b variant
"newmindai/Qwen2.5-72b-Instruct": {"license": "Qwen", "dtype": "bfloat16"}, # lowercase b variant
"grok-3-mini-fast-beta": {"license": "Proprietary", "dtype": "Unknown"}, # beta variant
# Legacy entries for backward compatibility
"deepseek-r1-distill-llama-70b": {"license": "MIT", "dtype": "bfloat16"},
"qwen-qwq-32b": {"license": "Apache 2.0", "dtype": "bfloat16"}
}
def log_model_submission(repo_id, base_model):
"""
Logs model submission details to a dedicated log file
Args:
repo_id: The repository ID of the model
base_model: The base model used
"""
submission_logger.info(f"SUBMISSION - REPO_ID: {repo_id}, BASE_MODEL: {base_model}")
def restart_space():
try:
if API is not None:
API.restart_space(repo_id=REPO_ID, token=TOKEN)
else:
print("Warning: API is None, cannot restart space")
except Exception as e:
print(f"Warning: Could not restart space: {e}")
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
# Always include model and model_type_symbol columns
selected_columns = [AutoEvalColumn.model_type_symbol.name, AutoEvalColumn.model.name]
# Add selected columns
for column in columns:
if column in df.columns:
selected_columns.append(column)
# Add dummy column for search
selected_columns.append(AutoEvalColumn.dummy.name)
return df[selected_columns]
def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame:
if not query:
return filtered_df
# Split query by ; and filter for each part
queries = query.split(";")
filtered_dfs = []
for q in queries:
q = q.strip()
if not q:
continue
filtered_dfs.append(filtered_df[filtered_df[AutoEvalColumn.dummy.name].str.contains(q, case=False)])
if not filtered_dfs:
return filtered_df
# Combine all filtered dataframes
return pd.concat(filtered_dfs).drop_duplicates()
def filter_models(
df: pd.DataFrame
) -> pd.DataFrame:
# Show all models
filtered_df = df.copy()
# Always filter out deleted models
filtered_df = filtered_df[filtered_df[AutoEvalColumn.still_on_hub.name]]
return filtered_df
# Yeni fonksiyonlar
def load_benchmark_results():
"""
Load benchmark results from local files
"""
results = {
"avg": {
"evalmix": [],
"light_eval": [],
"snake": [],
"retrieval": [],
"arena": [],
"human_arena": []
},
"raw": {
"evalmix": [],
"light_eval": [],
"snake": [],
"retrieval": [],
"arena": [],
"human_arena": []
}
}
# Define benchmark types to look for
benchmark_types = ["evalmix", "light_eval", "snake", "retrieval", "arena", "human_arena"] # "lm_harness" removed
# Initialize RAG Score calculator for runtime calculation
rag_calculator = None
rag_scores_cache = {} # Cache for RAG scores by run_id
try:
rag_calculator = RAGScoreCalculator()
if rag_calculator.stats:
logger.info("RAG Score calculator initialized successfully")
# Pre-calculate RAG scores from detail files
for data in rag_calculator.all_data:
run_id = data.get('run_id')
if run_id:
rag_score = rag_calculator.calculate_rag_score(data)
rag_scores_cache[run_id] = rag_score
logger.info(f"Pre-calculated {len(rag_scores_cache)} RAG scores")
else:
logger.warning("No RAG statistics available for score calculation")
except Exception as e:
logger.warning(f"Could not initialize RAG Score calculator: {e}")
rag_calculator = None
# Load raw JSON files (detailed results)
for benchmark_type in benchmark_types:
dir_path = f"result/{benchmark_type}"
# if benchmark_type == "lm_harness" and not os.path.exists(dir_path):
# dir_path = "result/lmharness"
# Skip if directory doesn't exist
if not os.path.exists(dir_path):
continue
# Load avg files for leaderboard
avg_files = glob.glob(f"{dir_path}/avg_*.json")
for file in avg_files:
try:
with open(file, "r") as f:
data = json.load(f)
# Handle different data formats
if isinstance(data, list):
# If data is a list, convert it to a dictionary
if benchmark_type == "arena" and len(data) > 0:
# For arena, create a dictionary with model_name
processed_data = {
"model_name": f"Model {os.path.basename(file).replace('avg_', '').split('.')[0]}",
"file": os.path.basename(file)
}
# Add metrics from the list if available
if len(data) > 0:
for i, item in enumerate(data):
if isinstance(item, dict):
for key, value in item.items():
processed_data[f"item_{i}_{key}"] = value
data = processed_data
else:
# For other types, create a dictionary with model_name
data = {"model_name": f"Model {os.path.basename(file).replace('avg_', '').split('.')[0]}"}
else:
# Ensure data is a dictionary
if not isinstance(data, dict):
data = {"model_name": f"Model {os.path.basename(file).replace('avg_', '').split('.')[0]}"}
# Add file information
data["file"] = os.path.basename(file)
# Ensure model_name exists
if "model_name" not in data or not data["model_name"]:
# Extract model ID from filename
file_name = os.path.basename(file)
model_id = file_name.replace("avg_", "").split(".")[0]
data["model_name"] = f"Model {model_id}"
# Format the model name nicely for display
if "model_name" in data:
data["model_name"] = format_model_name(data["model_name"])
# Add pre-calculated RAG Score for retrieval data (from detail files cache)
if benchmark_type == "retrieval" and rag_scores_cache:
run_id = data.get('run_id')
if run_id and run_id in rag_scores_cache:
data["RAG_score"] = rag_scores_cache[run_id]
logger.debug(f"Added cached RAG_score {rag_scores_cache[run_id]} for avg file {data.get('model_name', 'unknown')}")
else:
logger.debug(f"No cached RAG_score found for run_id: {run_id}")
results["avg"][benchmark_type].append(data)
except Exception as e:
print(f"Error loading {benchmark_type} avg file: {file} - {e}")
# Load detail files for pipeline-specific views
detail_files = glob.glob(f"{dir_path}/detail_*.json")
for file in detail_files:
try:
with open(file, "r") as f:
data = json.load(f)
# Handle different data formats
if isinstance(data, list):
# If data is a list, convert it to a dictionary
if benchmark_type == "arena" and len(data) > 0:
# For arena, create a dictionary with model_name
processed_data = {
"model_name": f"Model {os.path.basename(file).replace('detail_', '').split('.')[0]}",
"file": os.path.basename(file)
}
# Add metrics from the list if available
if len(data) > 0:
for i, item in enumerate(data):
if isinstance(item, dict):
for key, value in item.items():
processed_data[f"item_{i}_{key}"] = value
data = processed_data
else:
# For other types, create a dictionary with model_name
data = {"model_name": f"Model {os.path.basename(file).replace('detail_', '').split('.')[0]}"}
else:
# Ensure data is a dictionary
if not isinstance(data, dict):
data = {"model_name": f"Model {os.path.basename(file).replace('detail_', '').split('.')[0]}"}
# Add file information
data["file"] = os.path.basename(file)
# Ensure model_name exists
if "model_name" not in data or not data["model_name"]:
# Extract model ID from filename
file_name = os.path.basename(file)
model_id = file_name.replace("detail_", "").split(".")[0]
data["model_name"] = f"Model {model_id}"
# Format the model name nicely for display
if "model_name" in data:
data["model_name"] = format_model_name(data["model_name"])
# Add pre-calculated RAG Score for retrieval data (from cache)
if benchmark_type == "retrieval" and rag_scores_cache:
run_id = data.get('run_id')
if run_id and run_id in rag_scores_cache:
data["RAG_score"] = rag_scores_cache[run_id]
logger.debug(f"Added cached RAG_score {rag_scores_cache[run_id]} for detail file {data.get('model_name', 'unknown')}")
else:
logger.debug(f"No cached RAG_score found for detail run_id: {run_id}")
results["raw"][benchmark_type].append(data)
# Also add to default results to ensure we have all models in the leaderboard
# This ensures that models from detail files are also shown in the leaderboard
# Create a simplified version with just the model name and basic metrics
simplified_data = {"model_name": data["model_name"], "file": data["file"]}
# Extract key metrics based on benchmark type
if benchmark_type == "retrieval":
# For RAG Judge, extract RAG_score, RAG_success_rate and average_judge_score if available
# RAG_score should be available since we just calculated it above
if "RAG_score" in data:
simplified_data["RAG_score"] = data["RAG_score"]
if "RAG_success_rate" in data:
simplified_data["RAG_success_rate"] = data["RAG_success_rate"]
if "average_judge_score" in data:
simplified_data["average_judge_score"] = data["average_judge_score"]
# Add to default results if not already present
if not any(item.get("model_name") == data["model_name"] for item in results["avg"][benchmark_type]):
results["avg"][benchmark_type].append(simplified_data)
except Exception as e:
print(f"Error loading {benchmark_type} detail file: {file} - {e}")
return results
def format_model_name(model_name):
"""
Formats model names for better display in leaderboards:
- Replaces underscores with spaces
- Preserves original casing
Args:
model_name: Original model name string
Returns:
str: Formatted model name
"""
if not model_name:
return model_name
# Split model name by organization/model if present
if "/" in model_name:
org, name = model_name.split("/", 1)
# Format the model part only - replace underscores with spaces but preserve casing
formatted_name = name.replace("_", " ")
return f"{org}/{formatted_name}"
else:
# Format the whole name - replace underscores with spaces but preserve casing
return model_name.replace("_", " ")
def create_evalmix_table(data):
"""
Hybrid benchmark sonuçlarından tablo oluşturur
"""
if not data:
return pd.DataFrame()
# Apply model name formatting and add metadata from lookup table
for item in data:
if "model_name" in item:
raw_model_name = item["model_name"]
item["model_name"] = format_model_name(raw_model_name)
# Always use lookup table values for metadata (override JSON values)
for field in ["dtype", "license"]:
if raw_model_name in MODEL_METADATA_LOOKUP:
item[field] = MODEL_METADATA_LOOKUP[raw_model_name][field]
else:
defaults = {"dtype": "unknown", "license": "Unknown"}
item[field] = defaults[field]
df = pd.DataFrame(data)
# Remove the file column if present
if 'file' in df.columns:
df = df.drop(columns=['file'])
# Remove all sample count columns
sample_columns = ["total_samples", "Total Samples", "samples_number"]
for col in sample_columns:
if col in df.columns:
df = df.drop(columns=[col])
if "model_name" in df.columns:
df = df.sort_values(by="model_name")
# Ortalama metrik ekle - now handling the case when judge_metric is not available
if all(col in df.columns for col in ["lexical_metric", "semantic_metric"]):
if "judge_metric" in df.columns:
df["average_score"] = df[["lexical_metric", "semantic_metric", "judge_metric"]].mean(axis=1).round(2)
else:
df["average_score"] = df[["lexical_metric", "semantic_metric"]].mean(axis=1).round(2)
# Float değerleri 2 ondalık basamağa yuvarla
for column in df.columns:
try:
if pd.api.types.is_float_dtype(df[column]):
df[column] = df[column].round(2)
except:
continue
# Format column names for better display
column_mapping = {}
for col in df.columns:
# Skip already well-formatted columns
if col == "model_name":
column_mapping[col] = "Model Name"
continue
# Special handling for Turkish and Multilingual Semantic
if "turkish_semantic" in col.lower():
column_mapping[col] = "Turkish Semantic"
continue
if "multilingual_semantic" in col.lower():
column_mapping[col] = "Multilingual Semantic"
continue
# Special handling for certain columns
if col == "average_score":
column_mapping[col] = "Average Score"
continue
if col == "lexical_metric":
column_mapping[col] = "Lexical Score"
continue
if col == "semantic_metric":
column_mapping[col] = "Semantic Score"
continue
if col == "judge_metric":
column_mapping[col] = "Judge Score"
continue
if col == "openai_accuracy":
column_mapping[col] = "OpenAI Accuracy"
continue
if col == "dtype":
column_mapping[col] = "Dtype"
continue
if col == "license":
column_mapping[col] = "License"
continue
# Format column name
formatted_col = " ".join([word.capitalize() for word in col.replace("_", " ").split()])
column_mapping[col] = formatted_col
# Rename DataFrame columns
df = df.rename(columns=column_mapping)
# Sort by openai_accuracy if present, otherwise use Average Score
if "Turkish Semantic" in df.columns:
df = df.sort_values(by="Turkish Semantic", ascending=False)
elif "turkish_semantic" in df.columns:
df = df.sort_values(by="turkish_semantic", ascending=False)
# Define desired column order for EvalMix - metadata columns at the end
desired_cols = [
"Model Name",
"Turkish Semantic",
"Multilingual Semantic",
"Average Score",
"Lexical Score",
"Semantic Score",
"Judge Score",
"OpenAI Accuracy",
"Dtype",
"License"
]
# Filter out columns that don't exist in the DataFrame
final_cols = [col for col in desired_cols if col in df.columns]
# Add any remaining columns that weren't in the desired list
remaining_cols = [col for col in df.columns if col not in final_cols]
final_cols.extend(remaining_cols)
# Set the new column order
df = df[final_cols]
return df
def create_light_eval_table(data, is_detail=False):
"""
Creates a table from Light Eval results
Args:
data: Light eval data
is_detail: If True, keep 4 decimal places for detail results
"""
if not data:
return pd.DataFrame()
# Light eval sonuçları farklı formatta, düzenleme gerekiyor
formatted_data = []
for item in data:
model_data = {"model_name": format_model_name(item.get("model_name", "Bilinmeyen Model"))}
# Add specific metrics we're interested in
metrics = [
"overall_average",
"mmlu_average",
"truthfulqa",
"winogrande",
"hellaswag",
"gsm8k",
"arc_challenge",
"dtype",
"license"
# Removed total_samples
]
for metric in metrics:
try:
if metric in ["dtype", "license"]:
# Always use lookup table for metadata (override JSON values)
raw_model_name = item.get("model_name", "")
if raw_model_name in MODEL_METADATA_LOOKUP:
model_data[metric] = MODEL_METADATA_LOOKUP[raw_model_name][metric]
else:
# Default values for unknown models
defaults = {"dtype": "unknown", "license": "Unknown"}
model_data[metric] = defaults[metric]
elif metric in item:
if metric == "overall_average" and item[metric] == "N/A":
model_data[metric] = "N/A"
elif isinstance(item[metric], str) and item[metric] != "N/A":
model_data[metric] = float(item[metric])
else:
model_data[metric] = item[metric]
else:
model_data[metric] = "N/A"
except Exception as e:
if metric in ["dtype", "license"]:
defaults = {"dtype": "unknown", "license": "Unknown"}
model_data[metric] = defaults[metric]
else:
model_data[metric] = item.get(metric, "N/A")
formatted_data.append(model_data)
# Create DataFrame
df = pd.DataFrame(formatted_data)
# Remove the file column if present
if 'file' in df.columns:
df = df.drop(columns=['file'])
# Try to convert metrics to float with error handling (only numeric columns)
numeric_cols = ["overall_average", "mmlu_average", "truthfulqa", "winogrande", "hellaswag", "gsm8k", "arc_challenge"]
for col in numeric_cols:
if col in df.columns:
try:
# Convert column to float but keep "N/A" as is
df[col] = df[col].apply(lambda x: float(x) if isinstance(x, (int, float)) or (isinstance(x, str) and x != "N/A") else x)
except Exception as e:
pass # Keep original values if conversion fails
# Sort by overall_average if available
if "overall_average" in df.columns:
# For sorting, replace non-numeric values with NaN temporarily
sort_col = pd.to_numeric(df["overall_average"], errors="coerce")
# Sort with NaN at the end
df = df.iloc[sort_col.fillna(-1).argsort(kind="stable").iloc[::-1]]
# Float değerleri yuvarlama - detail için 4 hane, avg için 2 hane
decimal_places = 4 if is_detail else 2
for column in df.columns:
try:
if pd.api.types.is_float_dtype(df[column]):
df[column] = df[column].round(decimal_places)
except:
continue
# Format column names according to user request
column_mapping = {
"model_name": "Model Name",
"overall_average": "Overall",
"mmlu_average": "MMLU",
"truthfulqa": "Truthfulqa",
"winogrande": "Winogrande",
"hellaswag": "Hellaswag",
"gsm8k": "Gsm8k",
"arc_challenge": "ARC",
"dtype": "Dtype",
"license": "License"
}
# Rename DataFrame columns
df = df.rename(columns=column_mapping)
# Define desired column order for Light-Eval - metadata columns at the end
desired_cols = [
"Model Name",
"Overall",
"MMLU",
"Truthfulqa",
"Winogrande",
"Hellaswag",
"Gsm8k",
"ARC",
"Dtype",
"License"
]
# Filter out columns that don't exist in the DataFrame
final_cols = [col for col in desired_cols if col in df.columns]
# Add any remaining columns that weren't in the desired list
remaining_cols = [col for col in df.columns if col not in final_cols]
final_cols.extend(remaining_cols)
# Set the new column order
df = df[final_cols]
return df
def create_benchmark_plots(benchmark_data, data_type="avg"):
"""
Benchmark verilerinden grafikler oluşturur
Args:
benchmark_data: Benchmark verileri
data_type: "avg" veya "raw" olabilir
"""
plots = {}
# Hybrid Benchmark için çubuk grafik
if benchmark_data[data_type]["evalmix"]:
df = create_evalmix_table(benchmark_data[data_type]["evalmix"])
if not df.empty and all(col in df.columns for col in ["model_name", "lexical_metric", "semantic_metric"]):
# Determine which metrics are available
metrics = ["lexical_metric", "semantic_metric"]
if "judge_metric" in df.columns:
metrics.append("judge_metric")
# Veriyi uzun formata dönüştür
plot_df = pd.melt(
df,
id_vars=["model_name"],
value_vars=metrics,
var_name="Metrik",
value_name="Değer"
)
# Metrik isimlerini daha okunabilir hale getir
plot_df["Metrik"] = plot_df["Metrik"].replace({
"lexical_metric": "Lexical Metric",
"semantic_metric": "Semantic Metric",
"judge_metric": "Judge Metric"
})
fig = px.bar(
plot_df,
x="model_name",
y="Değer",
color="Metrik",
title="Hybrid Benchmark Results",
labels={"model_name": "Model", "Değer": "Score"},
barmode="group"
)
plots["evalmix"] = fig
# Light Eval için radar grafik
if benchmark_data[data_type]["light_eval"]:
df = create_light_eval_table(benchmark_data[data_type]["light_eval"])
if not df.empty:
# Ortalama ve total_samples sütunlarını hariç tut
metric_cols = [col for col in df.columns if col not in ["model_name", "Ortalama", "file", "overall_average", "total_samples"]]
if metric_cols:
fig = go.Figure()
for _, row in df.iterrows():
fig.add_trace(go.Scatterpolar(
r=[row[col] for col in metric_cols],
theta=metric_cols,
fill='toself',
name=row.get("model_name", "Unknown Model")
))
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=True,
range=[0, 1]
)
),
title="Light Eval Results",
showlegend=True
)
plots["light_eval"] = fig
return plots
def create_combined_leaderboard_table(benchmark_data):
"""
Creates a combined leaderboard table from avg JSON data
"""
# Define benchmark types to include in the leaderboard
benchmark_types = ["evalmix", "light_eval", "retrieval", "arena", "human_arena"] # "lm_harness" removed
all_models = {}
# Process each benchmark type - exclude snake
for benchmark_type in benchmark_types:
# For human_arena and retrieval, use raw data since avg files don't have complete info
if benchmark_type in ["human_arena", "retrieval"]:
data_source = benchmark_data["raw"][benchmark_type]
else:
data_source = benchmark_data["avg"][benchmark_type]
# Skip if no data for this benchmark type
if not data_source:
continue
# Process each model in this benchmark type
for item in data_source:
model_name = item.get("model_name", "")
if not model_name:
continue
# Format the model name
formatted_model_name = format_model_name(model_name)
# Create entry for this model if it doesn't exist
if formatted_model_name not in all_models:
all_models[formatted_model_name] = {"model_name": formatted_model_name}
# Add metadata fields using lookup table
for field in ["dtype", "license"]:
if model_name in MODEL_METADATA_LOOKUP:
all_models[formatted_model_name][field] = MODEL_METADATA_LOOKUP[model_name][field]
else:
defaults = {"dtype": "unknown", "license": "Unknown"}
all_models[formatted_model_name][field] = defaults[field]
# Extract only the fields we care about for each benchmark type
if benchmark_type == "evalmix":
if "lexical_metric" in item:
all_models[formatted_model_name]["Lexical"] = round(item.get("lexical_metric", 0), 2)
if "semantic_metric" in item:
all_models[formatted_model_name]["Multilingual Semantic"] = round(item.get("semantic_metric", 0), 2)
# Extract Turkish Semantic score if available
if "turkish_semantic" in item:
all_models[formatted_model_name]["Turkish Semantic"] = round(item.get("turkish_semantic", 0), 2)
elif "turkish_semantic_" in item:
all_models[formatted_model_name]["Turkish Semantic"] = round(item.get("turkish_semantic_", 0), 2)
elif "nlp_metrics" in item and "cosine_similarity_turkish" in item.get("nlp_metrics", {}):
turkish_sim = item.get("nlp_metrics", {}).get("cosine_similarity_turkish", {}).get("mean", 0)
all_models[formatted_model_name]["Turkish Semantic"] = round(turkish_sim, 2)
# Extract Multilingual Semantic explicitly if available
if "multilingual_semantic" in item:
all_models[formatted_model_name]["Multilingual Semantic"] = round(item.get("multilingual_semantic", 0), 2)
elif "multilingual_semantic_" in item:
all_models[formatted_model_name]["Multilingual Semantic"] = round(item.get("multilingual_semantic_", 0), 2)
elif "nlp_metrics" in item and "cosine_similarity_multilingual" in item.get("nlp_metrics", {}):
multi_sim = item.get("nlp_metrics", {}).get("cosine_similarity_multilingual", {}).get("mean", 0)
all_models[formatted_model_name]["Multilingual Semantic"] = round(multi_sim, 2)
# Extract BERTScore F1 if available
if "bert_score" in item and isinstance(item.get("bert_score"), dict) and "f1" in item.get("bert_score", {}):
bert_f1 = item.get("bert_score", {}).get("f1", {}).get("mean", 0)
all_models[formatted_model_name]["BERTScore F1"] = round(bert_f1, 2)
elif "nlp_metrics" in item and "bert_score" in item.get("nlp_metrics", {}):
bert_f1 = item.get("nlp_metrics", {}).get("bert_score", {}).get("f1", {}).get("mean", 0)
all_models[formatted_model_name]["BERTScore F1"] = round(bert_f1, 2)
# Remove dtype and license from JSON - use only lookup table values
elif benchmark_type == "light_eval":
if "overall_average" in item:
try:
if isinstance(item["overall_average"], str) and item["overall_average"] != "N/A":
avg_value = float(item["overall_average"])
else:
avg_value = item["overall_average"]
all_models[formatted_model_name]["Light Eval"] = round(avg_value, 2)
except (ValueError, TypeError):
all_models[formatted_model_name]["Light Eval"] = item["overall_average"]
# Remove dtype and license from JSON - use only lookup table values
elif benchmark_type == "retrieval":
# Prefer RAG_score if available, otherwise use RAG_success_rate
if "RAG_score" in item:
avg_value = item["RAG_score"]
all_models[formatted_model_name]["Retrieval"] = round(avg_value, 4) # Higher precision for RAG Score
elif "RAG_success_rate" in item:
avg_value = item["RAG_success_rate"]
all_models[formatted_model_name]["Retrieval"] = round(avg_value, 2)
# Remove dtype and license from JSON - use only lookup table values
elif benchmark_type == "arena":
if "Melo Score" in item:
all_models[formatted_model_name]["Auto Elo Score"] = round(item.get("Melo Score", 0), 2)
# Remove dtype and license from JSON - use only lookup table values
elif benchmark_type == "human_arena":
if "elo_rating" in item:
all_models[formatted_model_name]["Human Elo Score"] = round(item.get("elo_rating", 0), 2)
# Remove dtype and license from JSON - use only lookup table values
# Create DataFrame from the collected data
if all_models:
df = pd.DataFrame(list(all_models.values()))
# Rename model_name column to be more user-friendly
if "model_name" in df.columns:
df = df.rename(columns={"model_name": "Model Name"})
# Rename metadata columns to proper case
column_mapping = {
"dtype": "Dtype",
"license": "License"
}
df = df.rename(columns=column_mapping)
# Make sure to remove the file column if it's present
if 'file' in df.columns:
df = df.drop(columns=['file'])
# Remove run_id and user_id fields if present
for field in ['run_id', 'user_id', 'Run Id', 'User Id']:
if field in df.columns:
df = df.drop(columns=[field])
# Define the exact columns we want to display in the order we want them
display_cols = [
"Auto Elo Score",
"Human Elo Score",
"Retrieval",
"Light Eval",
"Turkish Semantic",
"Multilingual Semantic",
"Lexical",
"Dtype",
"License"
]
valid_display_cols = [col for col in display_cols if col in df.columns]
# Fill NaN values with 0
for col in valid_display_cols:
df[col] = df[col].fillna(0)
# Explicitly reorder columns to match the UI display order exactly as in the screenshot
desired_order = ["Model Name", "Auto Elo Score", "Human Elo Score", "Retrieval", "Light Eval", "Turkish Semantic", "Multilingual Semantic", "Lexical", "Dtype", "License"]
# Filter out columns that don't exist in the DataFrame
actual_order = [col for col in desired_order if col in df.columns]
# Reorder columns
if len(actual_order) > 0:
df = df[actual_order]
# Sort by Auto Elo Score if available, otherwise by Human Elo Score
if "Auto Elo Score" in df.columns:
df = df.sort_values(by="Auto Elo Score", ascending=False)
elif "Human Elo Score" in df.columns:
df = df.sort_values(by="Human Elo Score", ascending=False)
# Float değerleri 2 ondalık basamağa yuvarla
for column in df.columns:
try:
if pd.api.types.is_float_dtype(df[column]):
df[column] = df[column].round(2)
except:
continue
return df
return pd.DataFrame()
def create_raw_details_table(benchmark_data, benchmark_type):
"""
Creates a detailed table from raw JSON data for a specific benchmark type
"""
if not benchmark_data["raw"][benchmark_type]:
return pd.DataFrame()
# Flatten the raw data
flattened_data = []
for item in benchmark_data["raw"][benchmark_type]:
raw_model_name = item.get("model_name", "Unknown Model")
flat_item = {
"file": item.get("file", ""),
"model_name": format_model_name(raw_model_name)
}
# Always use lookup table values for metadata (override JSON values)
for field in ["dtype", "license"]:
if raw_model_name in MODEL_METADATA_LOOKUP:
flat_item[field] = MODEL_METADATA_LOOKUP[raw_model_name][field]
else:
defaults = {"dtype": "unknown", "license": "Unknown"}
flat_item[field] = defaults[field]
# Define metadata fields to exclude - especially for LightEval
excluded_fields = ["file", "job_id", "start_time", "end_time", "run_id", "user_id",
"total_samples", "Total Samples", "samples_number", "sample_count", "eval_samples",
"total_success_references", "Total Success References", "total_eval_samples",
"provider", "Provider"] # Exclude provider fields
# For LightEval, also exclude mmlu_tasks field
if benchmark_type == "light_eval":
excluded_fields.append("mmlu_tasks")
# Add top-level fields (skip metadata fields and dtype/license which come from lookup table)
for key, value in item.items():
if key not in excluded_fields and key not in ["dtype", "license"] and not key.startswith("_") and not isinstance(value, (dict, list)):
flat_item[key] = value
# Flatten nested fields
for key, value in item.items():
if key.startswith("_") or key in excluded_fields:
# Skip metadata fields
continue
elif isinstance(value, dict):
# Flatten nested dictionaries
_flatten_dict(value, flat_item, prefix=key)
elif isinstance(value, list) and all(isinstance(x, dict) for x in value):
# Flatten list of dictionaries
for i, sub_dict in enumerate(value):
_flatten_dict(sub_dict, flat_item, prefix=f"{key}_{i}")
flattened_data.append(flat_item)
# Create DataFrame
df = pd.DataFrame(flattened_data)
# Format confidence interval for arena data
if benchmark_type == "arena" and "95%(CI)" in df.columns:
def format_confidence_interval(ci_value):
"""Convert '-1.65/+2.66' to '+2.66/-1.65' format"""
if isinstance(ci_value, str) and "/" in ci_value:
parts = ci_value.split("/")
if len(parts) == 2:
negative_part = parts[0].strip()
positive_part = parts[1].strip()
# Remove the signs and get the numbers
if negative_part.startswith("-"):
negative_num = negative_part[1:]
else:
negative_num = negative_part
if positive_part.startswith("+"):
positive_num = positive_part[1:]
else:
positive_num = positive_part
# Return in +positive/-negative format
return f"+{positive_num}/-{negative_num}"
return ci_value
df["95%(CI)"] = df["95%(CI)"].apply(format_confidence_interval)
# Ensure model_name is first column
if "model_name" in df.columns:
cols = ["model_name"] + [col for col in df.columns if col != "model_name"]
df = df[cols]
# Float değerleri 2 ondalık basamağa yuvarla
for column in df.columns:
try:
if pd.api.types.is_float_dtype(df[column]):
df[column] = df[column].round(2)
except:
continue
# Remove the file column
if 'file' in df.columns:
df = df.drop(columns=['file'])
# Format column names for better display based on benchmark type
column_mapping = {
"model_name": "Model Name",
"dtype": "Dtype",
"license": "License"
}
# Use specific column mappings for each benchmark type
if benchmark_type == "arena":
# Arena benchmark column mappings
custom_columns = {
"Melo Score": "Auto Elo Score",
"Win Rate": "Win Rate",
"95%(CI)": "95% CI",
"Response Tokens Average": "Completion Tokens",
"dtype": "Dtype",
"Licance": "License",
}
column_mapping.update(custom_columns)
elif benchmark_type == "retrieval":
# RAG benchmark column mappings
custom_columns = {
"RAG_score": "RAG Score",
"RAG_success_rate": "Rag Success Rate",
"max_correct_references": "Max Correct Ref.",
"total_false_positives": "Hallucinate Ref.",
"total_missed_references": "Missed Ref.",
"average_judge_score": "Legal Judge Score"
# Removed "samples_number": "Total Samples"
}
column_mapping.update(custom_columns)
elif benchmark_type == "evalmix":
# Hybrid/EvalMix benchmark column mappings
custom_columns = {
"turkish_semantic_mean": "Turkish Semantic",
"turkish_semantic": "Turkish Semantic",
"multilingual_semantic_mean": "Multilingual Semantic",
"multilingual_semantic": "Multilingual Semantic",
"judge_metric": "Judge Score",
"bleu mean": "BLEU",
"rouge1 mean": "ROUGE-1",
"rouge2 mean": "ROUGE-2",
"rougeL mean": "ROUGE-L",
"bert_score f1 mean": "BERTScore F1",
"dtype": "Dtype",
"license": "License",
"bert_score precision mean": "BERTScore Precision"
# Removed "total_samples": "Total Samples"
}
column_mapping.update(custom_columns)
# Calculate Judge Average Score from OpenAI scores if they exist
if all(col in df.columns for col in ["openai_accuracy", "openai_relevance", "openai_coherence"]):
df["judge_average_score"] = df[["openai_accuracy", "openai_relevance", "openai_coherence"]].mean(axis=1).round(2)
column_mapping["judge_average_score"] = "Judge Score"
# Remove individual OpenAI score columns
columns_to_drop = ["openai_accuracy", "openai_relevance", "openai_coherence"]
for col in columns_to_drop:
if col in df.columns:
df = df.drop(columns=[col])
elif benchmark_type == "light_eval":
# Light Eval benchmark column mappings
custom_columns = {
"overall_average": "Overall",
"mmlu_average": "MMLU",
"truthfulqa": "Truthfulqa",
"winogrande": "Winogrande",
"hellaswag": "Hellaswag",
"gsm8k": "Gsm8k",
"arc_challenge": "ARC",
"dtype": "Dtype",
"license": "License"
}
column_mapping.update(custom_columns)
elif benchmark_type == "snake":
# Snake benchmark column mappings
custom_columns = {
"elo": "Elo Rating",
"win_rate": "Win Rate",
"draw_rate": "Draw Rate",
"dtype": "Dtype",
"license": "License"
}
column_mapping.update(custom_columns)
# For any columns not specifically mapped, use the default formatting
for col in df.columns:
if col not in column_mapping:
# Remove "mean" from column names
cleaned_col = col.replace(" mean", "")
# Format column name with default formatting
formatted_col = " ".join([word.capitalize() for word in cleaned_col.replace("_", " ").split()])
column_mapping[col] = formatted_col
# Rename DataFrame columns
df = df.rename(columns=column_mapping)
# Drop specific columns based on benchmark type
if benchmark_type == "retrieval" and "Success Ref." in df.columns:
df = df.drop(columns=["Success Ref."])
# Drop "Total Success References" column if it exists
if "Total Success References" in df.columns:
df = df.drop(columns=["Total Success References"])
# Sort by specific metrics based on benchmark type - AFTER column renaming
if benchmark_type == "arena" and "Auto Elo Score" in df.columns:
df = df.sort_values(by="Auto Elo Score", ascending=False)
# Define desired column order for Arena - metadata columns at the end
desired_cols = [
"Model Name",
"Auto Elo Score",
"Win Rate",
"95% CI",
"Completion Tokens",
"Dtype",
"License"
]
# Filter out columns that don't exist in the DataFrame
final_cols = [col for col in desired_cols if col in df.columns]
# Add any remaining columns that weren't in the desired list
remaining_cols = [col for col in df.columns if col not in final_cols]
final_cols.extend(remaining_cols)
# Set the new column order
df = df[final_cols]
elif benchmark_type == "retrieval":
# Sort by RAG Score if available, otherwise by Rag Success Rate
if "RAG Score" in df.columns:
df = df.sort_values(by="RAG Score", ascending=False)
primary_metric = "RAG Score"
elif "Rag Success Rate" in df.columns:
df = df.sort_values(by="Rag Success Rate", ascending=False)
primary_metric = "Rag Success Rate"
else:
primary_metric = None
# Define desired column order for Retrieval - metadata columns at the end
desired_cols = [
"Model Name",
"RAG Score",
"Rag Success Rate",
"Max Correct Ref.",
"Hallucinate Ref.",
"Missed Ref.",
"Legal Judge Score",
"Dtype",
"License"
]
# Filter out columns that don't exist in the DataFrame
final_cols = [col for col in desired_cols if col in df.columns]
# Add any remaining columns that weren't in the desired list
remaining_cols = [col for col in df.columns if col not in final_cols]
final_cols.extend(remaining_cols)
# Set the new column order
df = df[final_cols]
elif benchmark_type == "evalmix":
if "Turkish Semantic" in df.columns:
df = df.sort_values(by="Turkish Semantic", ascending=False)
# Define desired column order
desired_cols = [
"Model Name",
"Turkish Semantic",
"Multilingual Semantic",
"Judge Score",
"BLEU",
"ROUGE-1",
"ROUGE-2",
"ROUGE-L",
"BERTScore F1",
"BERTScore Precision",
"BERTScore Recall",
"Dtype",
"License"
# "Total Samples" removed
]
# Filter out columns that don't exist in the DataFrame
final_cols = [col for col in desired_cols if col in df.columns]
# Set the new column order
df = df[final_cols]
# elif benchmark_type == "lm_harness" and "Overall" in df.columns:
# df = df.sort_values(by="Overall", ascending=False)
elif benchmark_type == "light_eval" and "Overall" in df.columns:
df = df.sort_values(by="Overall", ascending=False)
elif benchmark_type == "snake":
# Sort by Elo or Elo Rating if available
if "Elo Rating" in df.columns:
df = df.sort_values(by="Elo Rating", ascending=False)
elif "Elo" in df.columns:
df = df.sort_values(by="Elo", ascending=False)
# Define desired column order for Snake - metadata columns at the end
desired_cols = [
"Model Name",
"Elo Rating",
"Win Rate",
"Draw Rate",
"Wins",
"Losses",
"Ties",
"Loss Rate",
"Dtype",
"License"
]
# Filter out columns that don't exist in the DataFrame
final_cols = [col for col in desired_cols if col in df.columns]
# Add any remaining columns that weren't in the desired list
remaining_cols = [col for col in df.columns if col not in final_cols]
final_cols.extend(remaining_cols)
# Set the new column order
df = df[final_cols]
return df
def _flatten_dict(d, target_dict, prefix=""):
"""
Flattens nested dictionaries
Args:
d: Dictionary to flatten
target_dict: Target dictionary to add flattened values to
prefix: Key prefix
"""
# List of fields to exclude when flattening
excluded_fields = ["total_success_references", "total_eval_samples",
"details", "metadata", "config", "logs"]
# List of special field name transformations
special_field_mappings = {
"turkish_semantic_mean": "turkish_semantic",
"turkish_semantic_ mean": "turkish_semantic",
"multilingual_semantic_mean": "multilingual_semantic"
}
for key, value in d.items():
# Skip excluded fields
if key in excluded_fields:
continue
# Apply special field name transformations
transformed_key = special_field_mappings.get(key, key)
new_key = f"{prefix}_{transformed_key}" if prefix else transformed_key
if isinstance(value, dict):
# Flatten nested dictionaries
_flatten_dict(value, target_dict, new_key)
elif isinstance(value, list) and all(isinstance(x, dict) for x in value):
# Flatten list of dictionaries
for i, sub_dict in enumerate(value):
_flatten_dict(sub_dict, target_dict, f"{new_key}_{i}")
elif isinstance(value, list) and len(value) > 0:
# Convert simple lists to string
try:
# For numeric lists, calculate mean and std
if all(isinstance(x, (int, float)) for x in value):
import numpy as np
target_dict[f"{new_key}_mean"] = round(sum(value) / len(value), 2)
if len(value) > 1:
target_dict[f"{new_key}_std"] = round(np.std(value), 2)
else:
# For non-numeric lists, convert to string
target_dict[new_key] = str(value)
except:
# Fallback to string representation
target_dict[new_key] = str(value)
else:
# Add other values directly
# Float değerleri yuvarla
if isinstance(value, float):
target_dict[new_key] = round(value, 2)
else:
target_dict[new_key] = value
def update_supported_base_models():
"""
Updates the list of supported base models by querying API.
This function is called when the application starts to keep the base model list up to date.
"""
try:
import requests
import json
import re
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Get API key from environment variable
api_key = os.getenv("API_KEY")
if not api_key:
logger.error("API_KEY not found in environment variables")
return None
# API endpoint and headers
url = os.getenv("API_URL")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
# Test payload with non-existent model
payload = {
"source": "FILE_ID_BURAYA_GELECEK",
"base_model": "non-existent-model/fake-model-123",
"name": "test-invalid-model",
"description": "Desteklenen modelleri görmeye çalışıyorum"
}
# Make the request
response = requests.post(url, headers=headers, json=payload)
# Extract supported models from error message
if response.status_code != 200:
error_detail = response.json().get("detail", "")
# Extract the list of supported models using regex
match = re.search(r"list of supported models: \[(.*?)\]", error_detail)
if match:
supported_models_str = match.group(1)
# Parse the list of models without filtering out 'fast' models
supported_models = [model.strip("'") for model in supported_models_str.split(", ")]
# Update the base model list in the configuration
from api.config import update_base_model_list
update_base_model_list(supported_models)
logger.info(f"Successfully updated supported base models: {supported_models}")
return supported_models
else:
logger.error("Could not extract supported models from API response")
return None
else:
logger.error("Unexpected successful response from API")
return None
except Exception as e:
logger.error(f"Error updating supported base models: {str(e)}")
return None
def create_human_arena_table(data):
"""
Create Human Arena results table from detail data
"""
if not data:
return pd.DataFrame()
# Apply model name formatting and add metadata from lookup table
for item in data:
if "model_name" in item:
raw_model_name = item["model_name"]
item["model_name"] = format_model_name(raw_model_name)
# Always use lookup table values for metadata (override JSON values)
for field in ["dtype", "license"]:
if raw_model_name in MODEL_METADATA_LOOKUP:
item[field] = MODEL_METADATA_LOOKUP[raw_model_name][field]
else:
defaults = {"dtype": "unknown", "license": "Unknown"}
item[field] = defaults[field]
df = pd.DataFrame(data)
# Ensure model_name is first column
if "model_name" in df.columns:
cols = ["model_name"] + [col for col in df.columns if col != "model_name"]
df = df[cols]
# Define column mapping for better display
column_mapping = {
'model_name': 'Model Name',
'elo_rating': 'Human Elo Score',
'wins': 'Wins',
'losses': 'Losses',
'ties': 'Ties',
'total_games': 'Total Games',
'win_rate': 'Win Rate (%)',
'votes': 'Votes',
'dtype': 'Dtype',
'license': 'License',
'evaluation_date': 'Evaluation Date',
'evaluation_type': 'Type'
}
# Rename columns
df = df.rename(columns=column_mapping)
# Remove file, run_id, evaluation_date, evaluation_type, votes, and provider columns if present
columns_to_remove = ['file', 'run_id', 'Evaluation Date', 'Type', 'provider', 'Provider', 'Votes']
for col in columns_to_remove:
if col in df.columns:
df = df.drop(columns=[col])
# Sort by Human Elo Score in descending order
if 'Human Elo Score' in df.columns:
df = df.sort_values(by='Human Elo Score', ascending=False)
# Round numeric columns
numeric_cols = ['Human Elo Score', 'Win Rate (%)']
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').round(2)
return df