Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Game Reasoning Arena — Hugging Face Spaces Gradio App | |
Pipeline: | |
User clicks "Start Game" in Gradio | |
↓ | |
app.py (play_game) | |
↓ | |
ui/gradio_config_generator.py (run_game_with_existing_infrastructure) | |
↓ | |
src/game_reasoning_arena/ (core game infrastructure) | |
↓ | |
Game results + metrics displayed in Gradio | |
""" | |
from __future__ import annotations | |
import sqlite3 | |
import sys | |
import shutil | |
from pathlib import Path | |
from typing import List, Dict, Any, Tuple, Generator, TypedDict | |
import pandas as pd | |
import gradio as gr | |
# Logging (optional) | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
log = logging.getLogger("arena_space") | |
# Optional transformers import | |
try: | |
from transformers import pipeline # noqa: F401 | |
except Exception: | |
pass | |
# Make sure src is on PYTHONPATH | |
src_path = Path(__file__).parent / "src" | |
if str(src_path) not in sys.path: | |
sys.path.insert(0, str(src_path)) | |
# Try to import game registry | |
from game_reasoning_arena.arena.games.registry import registry as games_registry | |
from game_reasoning_arena.backends.huggingface_backend import ( | |
HuggingFaceBackend, | |
) | |
from game_reasoning_arena.backends import ( | |
initialize_llm_registry, LLM_REGISTRY, | |
) | |
BACKEND_SYSTEM_AVAILABLE = True | |
# ----------------------------------------------------------------------------- | |
# Config & constants | |
# ----------------------------------------------------------------------------- | |
# HF demo-safe tiny models (CPU friendly) | |
HUGGINGFACE_MODELS: Dict[str, str] = { | |
"gpt2": "gpt2", | |
"distilgpt2": "distilgpt2", | |
"google/flan-t5-small": "google/flan-t5-small", | |
"EleutherAI/gpt-neo-125M": "EleutherAI/gpt-neo-125M", | |
} | |
GAMES_REGISTRY: Dict[str, Any] = {} | |
db_dir = Path(__file__).resolve().parent / "results" | |
LEADERBOARD_COLUMNS = [ | |
"agent_name", "agent_type", "# games", "total rewards", | |
"avg_generation_time (sec)", "win-rate", "win vs_random (%)", | |
] | |
# ----------------------------------------------------------------------------- | |
# Init backend + register models (optional) | |
# ----------------------------------------------------------------------------- | |
huggingface_backend = None | |
if BACKEND_SYSTEM_AVAILABLE: | |
try: | |
huggingface_backend = HuggingFaceBackend() | |
initialize_llm_registry() | |
for model_name in HUGGINGFACE_MODELS.keys(): | |
if huggingface_backend.is_model_available(model_name): | |
registry_key = f"hf_{model_name}" | |
LLM_REGISTRY[registry_key] = { | |
"backend": huggingface_backend, | |
"model_name": model_name, | |
} | |
log.info("Registered HuggingFace model: %s", registry_key) | |
except Exception as e: | |
log.error("Failed to initialize HuggingFace backend: %s", e) | |
huggingface_backend = None | |
# ----------------------------------------------------------------------------- | |
# Load games registry | |
# ----------------------------------------------------------------------------- | |
try: | |
if games_registry is not None: | |
GAMES_REGISTRY = { | |
name: cls for name, cls in games_registry._registry.items() | |
} | |
log.info("Successfully imported full arena - games are playable.") | |
else: | |
GAMES_REGISTRY = {} | |
except Exception as e: | |
log.warning("Failed to load games registry: %s", e) | |
GAMES_REGISTRY = {} | |
def _get_game_display_mapping() -> Dict[str, str]: | |
""" | |
Build a mapping from internal game keys to their human‑friendly display names. | |
If the registry is not available or a game has no explicit display_name, | |
fall back to a title‑cased version of the internal key. | |
""" | |
mapping: Dict[str, str] = {} | |
if games_registry is not None and hasattr(games_registry, "_registry"): | |
for key, info in games_registry._registry.items(): | |
display = info.get("display_name") if isinstance(info, dict) else None | |
if not display: | |
display = key.replace("_", " ").title() | |
mapping[key] = display | |
return mapping | |
# ----------------------------------------------------------------------------- | |
# DB helpers | |
# ----------------------------------------------------------------------------- | |
def ensure_results_dir() -> None: | |
db_dir.mkdir(parents=True, exist_ok=True) | |
def iter_agent_databases() -> Generator[Tuple[str, str, str], None, None]: | |
"""Yield (db_file, agent_type, model_name) for non-random agents.""" | |
for db_file in find_or_download_db(): | |
agent_type, model_name = extract_agent_info(db_file) | |
if agent_type != "random": | |
yield db_file, agent_type, model_name | |
def find_or_download_db() -> List[str]: | |
"""Return .db files; ensure random_None.db exists with minimal schema.""" | |
ensure_results_dir() | |
random_db_path = db_dir / "random_None.db" | |
if not random_db_path.exists(): | |
conn = sqlite3.connect(str(random_db_path)) | |
try: | |
conn.execute( | |
""" | |
CREATE TABLE IF NOT EXISTS games ( | |
id INTEGER PRIMARY KEY, | |
game_name TEXT, | |
player1 TEXT, | |
player2 TEXT, | |
winner INTEGER, | |
timestamp TEXT | |
) | |
""" | |
) | |
conn.commit() | |
finally: | |
conn.close() | |
return [str(p) for p in db_dir.glob("*.db")] | |
def extract_agent_info(filename: str) -> Tuple[str, str]: | |
base_name = Path(filename).stem | |
parts = base_name.split("_", 1) | |
if len(parts) == 2: | |
return parts[0], parts[1] | |
return parts[0], "Unknown" | |
def get_available_games(include_aggregated: bool = True) -> List[str]: | |
"""Union of games seen in DBs and in registry.""" | |
game_names = set() | |
# From DBs | |
for db_file in find_or_download_db(): | |
conn = sqlite3.connect(db_file) | |
try: | |
df = pd.read_sql_query( | |
"SELECT DISTINCT game_name FROM moves", conn | |
) | |
game_names.update(df["game_name"].tolist()) | |
except Exception: | |
pass | |
finally: | |
conn.close() | |
# From registry | |
if GAMES_REGISTRY: | |
game_names.update(GAMES_REGISTRY.keys()) | |
if not game_names: | |
game_names.update(["tic_tac_toe", "kuhn_poker", "connect_four"]) | |
game_list = sorted(game_names) | |
if include_aggregated: | |
game_list.insert(0, "Aggregated Performance") | |
return game_list | |
def extract_illegal_moves_summary() -> pd.DataFrame: | |
"""# illegal moves per agent.""" | |
summary = [] | |
for db_file, agent_type, model_name in iter_agent_databases(): | |
conn = sqlite3.connect(db_file) | |
try: | |
df = pd.read_sql_query( | |
"SELECT COUNT(*) AS illegal_moves FROM illegal_moves", conn | |
) | |
count = int(df["illegal_moves"].iloc[0]) if not df.empty else 0 | |
except Exception: | |
count = 0 | |
finally: | |
conn.close() | |
summary.append({"agent_name": model_name, "illegal_moves": count}) | |
return pd.DataFrame(summary) | |
# ----------------------------------------------------------------------------- | |
# Player config | |
# ----------------------------------------------------------------------------- | |
class PlayerConfigData(TypedDict, total=False): | |
player_types: List[str] | |
player_type_display: Dict[str, str] | |
available_models: List[str] | |
class GameArenaConfig(TypedDict, total=False): | |
available_games: List[str] | |
player_config: PlayerConfigData | |
model_info: str | |
backend_available: bool | |
def setup_player_config( | |
player_type: str, player_model: str, player_id: str | |
) -> Dict[str, Any]: | |
"""Map dropdown selection to agent config for the runner.""" | |
if player_type == "random_bot": | |
return {"type": "random"} | |
if ( | |
player_type | |
and ( | |
player_type.startswith("llm_") | |
or player_type.startswith("hf_") | |
) | |
): | |
model_id = player_type.split("_", 1)[1] | |
if BACKEND_SYSTEM_AVAILABLE and model_id in HUGGINGFACE_MODELS: | |
return {"type": "llm", "model": model_id} | |
if ( | |
player_type == "llm" | |
and player_model in HUGGINGFACE_MODELS | |
and BACKEND_SYSTEM_AVAILABLE | |
): | |
return {"type": "llm", "model": player_model} | |
return {"type": "random"} | |
def create_player_config() -> GameArenaConfig: | |
# Internal names for arena dropdown | |
available_keys = get_available_games(include_aggregated=False) | |
# Map internal names to display names | |
key_to_display = _get_game_display_mapping() | |
available_games = [ | |
key_to_display.get(key, key.replace("_", " ").title()) | |
for key in available_keys | |
] | |
# Collect models seen in DBs for charts/labels | |
database_models = [model for _, _, model in iter_agent_databases()] | |
player_types = ["random_bot"] | |
player_type_display = {"random_bot": "Random Bot"} | |
if BACKEND_SYSTEM_AVAILABLE: | |
for model_key in HUGGINGFACE_MODELS.keys(): | |
key = f"hf_{model_key}" | |
player_types.append(key) | |
tag = model_key.split("/")[-1] | |
player_type_display[key] = f"HuggingFace: {tag}" | |
all_models = list(HUGGINGFACE_MODELS.keys()) + database_models | |
model_info = ( | |
"HuggingFace transformer models integrated with backend system." | |
if BACKEND_SYSTEM_AVAILABLE | |
else "Backend system not available - limited functionality." | |
) | |
# Build display→key mapping for games | |
display_to_key = { | |
key_to_display.get(key, key.replace("_", " ").title()): key | |
for key in available_keys | |
} | |
return { | |
"available_games": available_games, | |
"game_display_to_key": display_to_key, | |
"player_config": { | |
"player_types": player_types, | |
"player_type_display": player_type_display, | |
"available_models": all_models, | |
}, | |
"model_info": model_info, | |
"backend_available": BACKEND_SYSTEM_AVAILABLE, | |
} | |
# ----------------------------------------------------------------------------- | |
# Main game entry | |
# ----------------------------------------------------------------------------- | |
def play_game( | |
game_name: str, | |
player1_type: str, | |
player2_type: str, | |
player1_model: str | None = None, | |
player2_model: str | None = None, | |
rounds: int = 1, | |
) -> str: | |
if game_name == "No Games Found": | |
return "No games available. Please add game databases." | |
log.info( | |
"Starting game: %s | P1=%s(%s) P2=%s(%s) rounds=%d", | |
game_name, | |
player1_type, | |
player1_model, | |
player2_type, | |
player2_model, | |
rounds, | |
) | |
# Map human‑friendly game name back to internal key if needed | |
config = create_player_config() | |
if "game_display_to_key" in config and game_name in config["game_display_to_key"]: | |
game_name = config["game_display_to_key"][game_name] | |
# Map display labels for player types back to keys | |
display_to_key = { | |
v: k for k, v in config["player_config"]["player_type_display"].items() | |
} | |
if player1_type in display_to_key: | |
player1_type = display_to_key[player1_type] | |
if player2_type in display_to_key: | |
player2_type = display_to_key[player2_type] | |
try: | |
from ui.gradio_config_generator import ( | |
run_game_with_existing_infrastructure, | |
) | |
result = run_game_with_existing_infrastructure( | |
game_name=game_name, | |
player1_type=player1_type, | |
player2_type=player2_type, | |
player1_model=player1_model, | |
player2_model=player2_model, | |
rounds=rounds, | |
seed=42, | |
) | |
return result | |
except Exception as e: | |
return f"Error during game simulation: {e}" | |
def extract_leaderboard_stats(game_name: str) -> pd.DataFrame: | |
all_stats = [] | |
for db_file, agent_type, model_name in iter_agent_databases(): | |
conn = sqlite3.connect(db_file) | |
try: | |
if game_name == "Aggregated Performance": | |
# get totals across all games in this DB | |
df = pd.read_sql_query( | |
"SELECT COUNT(DISTINCT episode) AS games_played, SUM(reward) AS total_rewards " | |
"FROM game_results", | |
conn, | |
) | |
avg_time = conn.execute( | |
"SELECT AVG(generation_time) FROM moves" | |
).fetchone()[0] or 0 | |
wins_vs_random = conn.execute( | |
"SELECT COUNT(*) FROM game_results " | |
"WHERE opponent = 'random_None' AND reward > 0", | |
).fetchone()[0] or 0 | |
total_vs_random = conn.execute( | |
"SELECT COUNT(*) FROM game_results " | |
"WHERE opponent = 'random_None'", | |
).fetchone()[0] or 0 | |
else: | |
# filter by the selected game | |
df = pd.read_sql_query( | |
"SELECT COUNT(DISTINCT episode) AS games_played, SUM(reward) AS total_rewards " | |
"FROM game_results WHERE game_name = ?", | |
conn, | |
params=(game_name,), | |
) | |
avg_time = conn.execute( | |
"SELECT AVG(generation_time) FROM moves WHERE game_name = ?", | |
(game_name,), | |
).fetchone()[0] or 0 | |
wins_vs_random = conn.execute( | |
"SELECT COUNT(*) FROM game_results " | |
"WHERE opponent = 'random_None' AND reward > 0 AND game_name = ?", | |
(game_name,), | |
).fetchone()[0] or 0 | |
total_vs_random = conn.execute( | |
"SELECT COUNT(*) FROM game_results " | |
"WHERE opponent = 'random_None' AND game_name = ?", | |
(game_name,), | |
).fetchone()[0] or 0 | |
# If there were no results for this game, df will be empty or NaNs. | |
if df.empty or df["games_played"].iloc[0] is None: | |
games_played = 0 | |
total_rewards = 0.0 | |
else: | |
games_played = int(df["games_played"].iloc[0] or 0) | |
total_rewards = float(df["total_rewards"].iloc[0] or 0) / 2.0 | |
vs_random_rate = ( | |
(wins_vs_random / total_vs_random) * 100.0 | |
if total_vs_random > 0 | |
else 0.0 | |
) | |
# Build a single-row DataFrame for this agent | |
row = { | |
"agent_name": model_name, | |
"agent_type": agent_type, | |
"# games": games_played, | |
"total rewards": total_rewards, | |
"avg_generation_time (sec)": round(float(avg_time), 3), | |
"win-rate": round(vs_random_rate, 2), | |
"win vs_random (%)": round(vs_random_rate, 2), | |
} | |
all_stats.append(pd.DataFrame([row])) | |
finally: | |
conn.close() | |
# Concatenate all rows; if all_stats is empty, return an empty DataFrame with columns. | |
if not all_stats: | |
return pd.DataFrame(columns=LEADERBOARD_COLUMNS) | |
leaderboard_df = pd.concat(all_stats, ignore_index=True) | |
return leaderboard_df[LEADERBOARD_COLUMNS] | |
# ----------------------------------------------------------------------------- | |
# Simple plotting helpers | |
# ----------------------------------------------------------------------------- | |
def create_bar_plot( | |
data: pd.DataFrame, | |
x_col: str, | |
y_col: str, | |
title: str, | |
x_label: str, | |
y_label: str, | |
) -> gr.BarPlot: | |
return gr.BarPlot( | |
value=data, | |
x=x_col, | |
y=y_col, | |
title=title, | |
x_label=x_label, | |
y_label=y_label, | |
) | |
# ----------------------------------------------------------------------------- | |
# Upload handler (save .db files to scripts/results/) | |
# ----------------------------------------------------------------------------- | |
def handle_db_upload(files: list[gr.File]) -> str: | |
ensure_results_dir() | |
saved = [] | |
for f in files or []: | |
dest = db_dir / Path(f.name).name | |
Path(f.name).replace(dest) | |
saved.append(dest.name) | |
return ( | |
f"Uploaded: {', '.join(saved)}" if saved else "No files uploaded." | |
) | |
# ----------------------------------------------------------------------------- | |
# UI | |
# ----------------------------------------------------------------------------- | |
with gr.Blocks() as interface: | |
with gr.Tab("Game Arena"): | |
config = create_player_config() | |
gr.Markdown("# LLM Game Arena") | |
gr.Markdown("Play games against LLMs or watch LLMs compete!") | |
gr.Markdown( | |
f"> **🤖 Available AI Players**: {config['model_info']}\n" | |
"> Local transformer models run with Hugging Face transformers. " | |
"No API tokens required!" | |
) | |
with gr.Row(): | |
game_dropdown = gr.Dropdown( | |
choices=config["available_games"], | |
label="Select a Game", | |
value=( | |
config["available_games"][0] | |
if config["available_games"] | |
else "No Games Found" | |
), | |
) | |
rounds_slider = gr.Slider( | |
minimum=1, | |
maximum=10, | |
value=1, | |
step=1, | |
label="Number of Rounds", | |
) | |
def player_selector_block(label: str): | |
gr.Markdown(f"### {label}") | |
choices_pairs = [ | |
(key, config["player_config"]["player_type_display"][key]) | |
for key in config["player_config"]["player_types"] | |
] | |
dd_type = gr.Dropdown( | |
choices=choices_pairs, | |
label=f"{label} Type", | |
value=choices_pairs[0][0], | |
) | |
dd_model = gr.Dropdown( | |
choices=config["player_config"]["available_models"], | |
label=f"{label} Model (if LLM)", | |
visible=False, | |
) | |
return dd_type, dd_model | |
with gr.Row(): | |
p1_type, p1_model = player_selector_block("Player 1") | |
p2_type, p2_model = player_selector_block("Player 2") | |
def _vis(player_type: str): | |
is_llm = ( | |
player_type == "llm" | |
or ( | |
player_type | |
and ( | |
player_type.startswith("llm_") | |
or player_type.startswith("hf_") | |
) | |
) | |
) | |
return gr.update(visible=is_llm) | |
p1_type.change(_vis, inputs=p1_type, outputs=p1_model) | |
p2_type.change(_vis, inputs=p2_type, outputs=p2_model) | |
play_button = gr.Button("🎮 Start Game", variant="primary") | |
game_output = gr.Textbox( | |
label="Game Log", | |
lines=20, | |
placeholder="Game results will appear here...", | |
) | |
play_button.click( | |
play_game, | |
inputs=[ | |
game_dropdown, | |
p1_type, | |
p2_type, | |
p1_model, | |
p2_model, | |
rounds_slider, | |
], | |
outputs=[game_output], | |
) | |
with gr.Tab("Leaderboard"): | |
gr.Markdown( | |
"# LLM Model Leaderboard\n" | |
"Track performance across different games!" | |
) | |
leaderboard_game_dropdown = gr.Dropdown( | |
choices=get_available_games(), | |
label="Select Game", | |
value="Aggregated Performance", | |
) | |
leaderboard_table = gr.Dataframe( | |
value=extract_leaderboard_stats("Aggregated Performance"), | |
headers=LEADERBOARD_COLUMNS, | |
interactive=False, | |
) | |
refresh_btn = gr.Button("🔄 Refresh") | |
def _update_leaderboard(game: str) -> pd.DataFrame: | |
return extract_leaderboard_stats(game) | |
leaderboard_game_dropdown.change( | |
_update_leaderboard, | |
inputs=[leaderboard_game_dropdown], | |
outputs=[leaderboard_table], | |
) | |
refresh_btn.click( | |
_update_leaderboard, | |
inputs=[leaderboard_game_dropdown], | |
outputs=[leaderboard_table], | |
) | |
gr.Markdown("### Upload new `.db` result files") | |
db_files = gr.Files(file_count="multiple", file_types=[".db"]) | |
upload_btn = gr.Button("⬆️ Upload to results/") | |
upload_status = gr.Markdown() | |
upload_btn.click( | |
handle_db_upload, inputs=[db_files], outputs=[upload_status] | |
) | |
with gr.Tab("Metrics Dashboard"): | |
gr.Markdown( | |
"# 📊 Metrics Dashboard\n" | |
"Visual summaries of LLM performance across games." | |
) | |
metrics_df = extract_leaderboard_stats("Aggregated Performance") | |
with gr.Row(): | |
create_bar_plot( | |
data=metrics_df, | |
x_col="agent_name", | |
y_col="win vs_random (%)", | |
title="Win Rate vs Random Bot", | |
x_label="LLM Model", | |
y_label="Win Rate (%)", | |
) | |
with gr.Row(): | |
create_bar_plot( | |
data=metrics_df, | |
x_col="agent_name", | |
y_col="avg_generation_time (sec)", | |
title="Average Generation Time", | |
x_label="LLM Model", | |
y_label="Time (sec)", | |
) | |
with gr.Row(): | |
gr.Dataframe( | |
value=metrics_df, | |
label="Performance Summary", | |
interactive=False, | |
) | |
with gr.Tab("Analysis of LLM Reasoning"): | |
gr.Markdown( | |
"# 🧠 Analysis of LLM Reasoning\n" | |
"Insights into move legality and decision behavior." | |
) | |
illegal_df = extract_illegal_moves_summary() | |
with gr.Row(): | |
create_bar_plot( | |
data=illegal_df, | |
x_col="agent_name", | |
y_col="illegal_moves", | |
title="Illegal Moves by Model", | |
x_label="LLM Model", | |
y_label="# of Illegal Moves", | |
) | |
with gr.Row(): | |
gr.Dataframe( | |
value=illegal_df, | |
label="Illegal Move Summary", | |
interactive=False, | |
) | |
with gr.Tab("About"): | |
gr.Markdown( | |
""" | |
# About Game Reasoning Arena | |
This app analyzes and visualizes LLM performance in games. | |
- **Game Arena**: Play games vs. LLMs or watch LLM vs. LLM | |
- **Leaderboard**: Performance statistics across games | |
- **Metrics Dashboard**: Visual summaries | |
- **Reasoning Analysis**: Illegal moves & behavior | |
**Data**: SQLite databases in `/results/`. | |
""" | |
) | |
# Local run only. On Spaces, the runtime will serve `interface` automatically. | |
if __name__ == "__main__": | |
interface.launch(server_name="0.0.0.0", server_port=None, show_api=False) | |