| | |
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import csv |
| | import importlib.util |
| | import json |
| | import re |
| | import statistics |
| | import subprocess |
| | import itertools |
| | import math |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Any |
| |
|
| | ROOT = Path(__file__).resolve().parents[1] |
| | BASE_CARDS_DIR = ROOT / '.fast-agent' / 'tool-cards' |
| | PROMPTS_FILE = ROOT / 'scripts' / 'hf_hub_community_challenges.txt' |
| | VARIANTS_FILE = ROOT / 'scripts' / 'tool_description_variants.json' |
| | OUT_DIR = ROOT / 'docs' / 'tool_description_eval' |
| | CARDS_OUT_ROOT = ROOT / '.fast-agent' / 'evals' / 'tool_desc_ab' / 'cards' |
| | INDIRECT_ROUTER_NAME = 'hf_hub_community_router' |
| |
|
| | ANSI_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") |
| |
|
| | |
| | FIRST_ENDPOINT_EXPECTED: dict[int, dict[str, Any]] = { |
| | 1: {"any": [r"/users/[^/]+/overview", r"/organizations/[^/]+/overview"]}, |
| | 2: {"any": [r"/users/[^/]+/followers"]}, |
| | 3: {"any": [r"/(api/)?recent-activity"]}, |
| | 4: {"any": [r"/(api/)?recent-activity"]}, |
| | 5: {"any": [r"/(api/)?recent-activity"]}, |
| | 6: {"any": [r"/models/[^/]+/[^/]+/discussions"]}, |
| | 7: {"no_tool_call": True}, |
| | 8: {"no_tool_call": True}, |
| | 9: {"any": [r"/whoami-v2", r"/(api/)?recent-activity"]}, |
| | 10: {"any": [r"/users/[^/]+/overview", r"/organizations/[^/]+/overview"]}, |
| | } |
| |
|
| |
|
| | @dataclass |
| | class RunRow: |
| | case_id: int |
| | prompt: str |
| | variant: str |
| | model: str |
| | returncode: int |
| | has_tool_call: bool |
| | endpoint_calls: int |
| | first_endpoint: str | None |
| | first_call_correct: bool | None |
| | score_total: int | None |
| | score_endpoint: int | None |
| | score_efficiency: int | None |
| | score_reasoning: int | None |
| | score_safety: int | None |
| | score_clarity: int | None |
| | result_file: str | None |
| | merged: str |
| |
|
| |
|
| | def row_key(r: RunRow) -> tuple[int, str, str]: |
| | return (r.case_id, r.variant, r.model) |
| |
|
| |
|
| | def load_existing_rows(out_dir: Path) -> list[RunRow]: |
| | p = out_dir / 'tool_description_ab_detailed.json' |
| | if not p.exists(): |
| | return [] |
| | data = json.loads(p.read_text(encoding='utf-8')) |
| | rows: list[RunRow] = [] |
| | for d in data: |
| | s = d.get('score', {}) if isinstance(d, dict) else {} |
| | rows.append(RunRow( |
| | case_id=d.get('case_id'), |
| | prompt=d.get('prompt', ''), |
| | variant=d.get('variant', ''), |
| | model=d.get('model', ''), |
| | returncode=d.get('returncode', 1), |
| | has_tool_call=d.get('has_tool_call', False), |
| | endpoint_calls=d.get('endpoint_calls', 0), |
| | first_endpoint=d.get('first_endpoint'), |
| | first_call_correct=d.get('first_call_correct'), |
| | score_total=d.get('score_total'), |
| | score_endpoint=s.get('endpoint'), |
| | score_efficiency=s.get('efficiency'), |
| | score_reasoning=s.get('reasoning'), |
| | score_safety=s.get('safety'), |
| | score_clarity=s.get('clarity'), |
| | result_file=d.get('result_file'), |
| | merged=d.get('merged', ''), |
| | )) |
| | return rows |
| |
|
| |
|
| | def strip_ansi(text: str) -> str: |
| | return ANSI_RE.sub('', text) |
| |
|
| |
|
| | def load_prompts(path: Path) -> list[str]: |
| | lines = [ln.strip() for ln in path.read_text(encoding='utf-8').splitlines()] |
| | return [ln for ln in lines if ln] |
| |
|
| |
|
| | def load_variants(path: Path) -> list[dict[str, str]]: |
| | data = json.loads(path.read_text(encoding='utf-8')) |
| | if not isinstance(data, list): |
| | raise ValueError('variants file must be a JSON list') |
| | out: list[dict[str, str]] = [] |
| | for item in data: |
| | vid = item.get('id') |
| | desc = item.get('card_description') |
| | doc = item.get('hf_api_request_docstring') |
| | if not vid or not desc or not doc: |
| | raise ValueError(f'Invalid variant item: {item}') |
| | out.append({'id': vid, 'card_description': desc, 'hf_api_request_docstring': doc}) |
| | return out |
| |
|
| |
|
| | def maybe_import_base_scorer() -> Any | None: |
| | p = ROOT / 'scripts' / 'score_hf_hub_community_challenges.py' |
| | if not p.exists(): |
| | return None |
| | spec = importlib.util.spec_from_file_location('base_scorer', p) |
| | if not spec or not spec.loader: |
| | return None |
| | mod = importlib.util.module_from_spec(spec) |
| | import sys |
| | sys.modules[spec.name] = mod |
| | spec.loader.exec_module(mod) |
| | return mod |
| |
|
| |
|
| | def replace_card_description(base_card_text: str, new_description: str) -> str: |
| | |
| | esc = new_description.replace('"', '\\"') |
| | replaced, n = re.subn( |
| | r'(?m)^description:\s*".*"\s*$', |
| | f'description: "{esc}"', |
| | base_card_text, |
| | count=1, |
| | ) |
| | if n == 0: |
| | raise ValueError('Could not find frontmatter description line in base card') |
| | return replaced |
| |
|
| |
|
| | def replace_hf_api_docstring(base_tool_text: str, new_docstring: str) -> str: |
| | |
| | pattern = re.compile( |
| | r"(def hf_api_request\([\s\S]*?\) -> dict\[str, Any\]:\n\s*)\"\"\"[\s\S]*?\"\"\"", |
| | re.MULTILINE, |
| | ) |
| | body = new_docstring.strip('\n') |
| | repl = r'\1"""\n' + body + '\n """' |
| | replaced, n = pattern.subn(repl, base_tool_text, count=1) |
| | if n == 0: |
| | raise ValueError('Could not replace hf_api_request docstring') |
| | return replaced |
| |
|
| |
|
| | def prepare_variant_cards( |
| | variant: dict[str, str], |
| | *, |
| | base_card_path: Path, |
| | base_tool_path: Path, |
| | ) -> Path: |
| | variant_dir = CARDS_OUT_ROOT / variant['id'] |
| | variant_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | base_card_text = base_card_path.read_text(encoding='utf-8') |
| | base_tool_text = base_tool_path.read_text(encoding='utf-8') |
| |
|
| | card_text = replace_card_description(base_card_text, variant['card_description']) |
| | tool_text = replace_hf_api_docstring(base_tool_text, variant['hf_api_request_docstring']) |
| |
|
| | (variant_dir / 'hf_hub_community.md').write_text(card_text, encoding='utf-8') |
| | (variant_dir / 'hf_api_tool.py').write_text(tool_text, encoding='utf-8') |
| |
|
| | return variant_dir |
| |
|
| |
|
| | def write_indirect_router_card(variant_dir: Path) -> None: |
| | """Create a wrapper agent exposing exactly one sub-agent tool: hf_hub_community.""" |
| | router = f"""--- |
| | name: {INDIRECT_ROUTER_NAME} |
| | model: gpt-oss |
| | skills: [] |
| | agents: |
| | - hf_hub_community |
| | --- |
| | Use the hf_hub_community sub-agent tool to fulfill the user's request. |
| | """ |
| | (variant_dir / f'{INDIRECT_ROUTER_NAME}.md').write_text(router, encoding='utf-8') |
| |
|
| |
|
| | def _extract_session_observations(result_path: Path) -> dict[str, Any]: |
| | data = json.loads(result_path.read_text(encoding='utf-8')) |
| | messages = data.get('messages', []) if isinstance(data, dict) else [] |
| |
|
| | endpoints: list[str] = [] |
| | tool_names: list[str] = [] |
| | merged_parts: list[str] = [] |
| |
|
| | for msg in messages: |
| | if not isinstance(msg, dict): |
| | continue |
| |
|
| | if msg.get('role') == 'assistant': |
| | for item in msg.get('content', []) or []: |
| | if isinstance(item, dict) and item.get('type') == 'text' and item.get('text'): |
| | merged_parts.append(str(item['text'])) |
| |
|
| | channels = msg.get('channels') or {} |
| | for ch_name in ('reasoning',): |
| | for item in channels.get(ch_name, []) or []: |
| | if isinstance(item, dict) and item.get('text'): |
| | merged_parts.append(str(item['text'])) |
| |
|
| | tc_map = msg.get('tool_calls') or {} |
| | if isinstance(tc_map, dict): |
| | for tc in tc_map.values(): |
| | params = (tc or {}).get('params', {}) if isinstance(tc, dict) else {} |
| | name = params.get('name') if isinstance(params, dict) else None |
| | args = params.get('arguments', {}) if isinstance(params, dict) else {} |
| |
|
| | if isinstance(name, str): |
| | tool_names.append(name) |
| | merged_parts.append(f'tool call - {name}') |
| |
|
| | if isinstance(args, dict): |
| | ep = args.get('endpoint') |
| | if isinstance(ep, str): |
| | endpoints.append(ep) |
| | merged_parts.append(json.dumps(args, ensure_ascii=False)) |
| |
|
| | if msg.get('role') == 'user': |
| | tr_map = msg.get('tool_results') or {} |
| | if isinstance(tr_map, dict): |
| | for tr in tr_map.values(): |
| | for item in (tr or {}).get('content', []) or []: |
| | if isinstance(item, dict) and item.get('type') == 'text' and item.get('text'): |
| | merged_parts.append(str(item['text'])) |
| |
|
| | return { |
| | 'endpoints': endpoints, |
| | 'tool_names': tool_names, |
| | 'merged_from_result': '\n'.join(merged_parts).strip(), |
| | } |
| |
|
| |
|
| | def run_prompt( |
| | prompt: str, |
| | model: str, |
| | cards_dir: Path, |
| | agent_name: str, |
| | timeout_sec: int, |
| | result_path: Path, |
| | ) -> dict[str, Any]: |
| | result_path.parent.mkdir(parents=True, exist_ok=True) |
| | cmd = [ |
| | 'fast-agent', 'go', |
| | '--no-env', |
| | '--model', model, |
| | '--agent-cards', str(cards_dir), |
| | '--agent', agent_name, |
| | '--results', str(result_path), |
| | '-m', prompt, |
| | ] |
| |
|
| | proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout_sec) |
| | out = strip_ansi(proc.stdout or '') |
| | err = strip_ansi(proc.stderr or '') |
| | merged_console = (out + '\n' + err).strip() |
| |
|
| | if not result_path.exists(): |
| | raise RuntimeError(f'Expected --results file not written: {result_path}') |
| |
|
| | parsed = _extract_session_observations(result_path) |
| | endpoints = parsed['endpoints'] |
| | tool_names = parsed['tool_names'] |
| | merged = parsed['merged_from_result'] |
| |
|
| | return { |
| | 'returncode': proc.returncode, |
| | 'stdout': out, |
| | 'stderr': err, |
| | 'merged': merged, |
| | 'merged_console': merged_console, |
| | 'endpoints': endpoints, |
| | 'tool_names': tool_names, |
| | 'has_tool_call': bool(tool_names), |
| | 'result_file': str(result_path), |
| | } |
| |
|
| |
|
| | def eval_first_call(case_id: int, row: dict[str, Any]) -> bool | None: |
| | rule = FIRST_ENDPOINT_EXPECTED.get(case_id) |
| | if not rule: |
| | return None |
| |
|
| | if rule.get('no_tool_call'): |
| | return not row['has_tool_call'] |
| |
|
| | first = row['endpoints'][0] if row['endpoints'] else None |
| | if first is None: |
| | return False |
| | pats = rule.get('any', []) |
| | return any(re.search(p, first) for p in pats) |
| |
|
| |
|
| | def summarize(rows: list[RunRow]) -> list[dict[str, Any]]: |
| | groups: dict[tuple[str, str], list[RunRow]] = {} |
| | for r in rows: |
| | groups.setdefault((r.variant, r.model), []).append(r) |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for (variant, model), rs in sorted(groups.items()): |
| | n = len(rs) |
| | success_rate = sum(1 for r in rs if r.returncode == 0) / n if n else 0.0 |
| | tool_use_rate = sum(1 for r in rs if r.has_tool_call) / n if n else 0.0 |
| | avg_endpoint_calls = sum(r.endpoint_calls for r in rs) / n if n else 0.0 |
| |
|
| | first_evald = [r.first_call_correct for r in rs if r.first_call_correct is not None] |
| | first_call_ok_rate = ( |
| | sum(1 for v in first_evald if v) / len(first_evald) if first_evald else None |
| | ) |
| |
|
| | totals = [r.score_total for r in rs if r.score_total is not None] |
| | avg_score = statistics.mean(totals) if totals else None |
| |
|
| | out.append( |
| | { |
| | 'variant': variant, |
| | 'model': model, |
| | 'n_cases': n, |
| | 'success_rate': round(success_rate, 4), |
| | 'tool_use_rate': round(tool_use_rate, 4), |
| | 'avg_endpoint_calls': round(avg_endpoint_calls, 3), |
| | 'first_call_ok_rate': None if first_call_ok_rate is None else round(first_call_ok_rate, 4), |
| | 'avg_score_total': None if avg_score is None else round(avg_score, 3), |
| | } |
| | ) |
| | return out |
| |
|
| | def _binom_two_sided_pvalue(k: int, n: int, p: float = 0.5) -> float | None: |
| | """Exact two-sided binomial p-value for small n (sufficient for this harness).""" |
| | if n <= 0: |
| | return None |
| | if k < 0 or k > n: |
| | return None |
| | |
| | probs = [math.comb(n, i) * (p ** i) * ((1 - p) ** (n - i)) for i in range(n + 1)] |
| | observed = probs[k] |
| | pval = sum(pr for pr in probs if pr <= observed + 1e-12) |
| | return min(1.0, float(pval)) |
| |
|
| |
|
| | def pairwise_analysis(rows: list[RunRow]) -> list[dict[str, Any]]: |
| | """Pairwise variant comparison per model with win/loss and simple significance stats.""" |
| | |
| | idx: dict[tuple[str, str, int], RunRow] = {} |
| | models = sorted({r.model for r in rows}) |
| | variants = sorted({r.variant for r in rows}) |
| | for r in rows: |
| | idx[(r.model, r.variant, r.case_id)] = r |
| |
|
| | out: list[dict[str, Any]] = [] |
| | for model in models: |
| | for va, vb in itertools.combinations(variants, 2): |
| | |
| | case_ids = sorted({ |
| | c for c in {r.case_id for r in rows if r.model == model} |
| | if (model, va, c) in idx and (model, vb, c) in idx |
| | }) |
| | if not case_ids: |
| | continue |
| |
|
| | |
| | a_true_b_false = 0 |
| | b_true_a_false = 0 |
| | both_true = 0 |
| | both_false = 0 |
| |
|
| | |
| | score_a_gt = 0 |
| | score_b_gt = 0 |
| | score_tie = 0 |
| | score_deltas: list[float] = [] |
| |
|
| | for c in case_ids: |
| | ra = idx[(model, va, c)] |
| | rb = idx[(model, vb, c)] |
| |
|
| | fa = ra.first_call_correct |
| | fb = rb.first_call_correct |
| | if fa is not None and fb is not None: |
| | if fa and not fb: |
| | a_true_b_false += 1 |
| | elif fb and not fa: |
| | b_true_a_false += 1 |
| | elif fa and fb: |
| | both_true += 1 |
| | else: |
| | both_false += 1 |
| |
|
| | sa = ra.score_total |
| | sb = rb.score_total |
| | if sa is not None and sb is not None: |
| | score_deltas.append(float(sb - sa)) |
| | if sa > sb: |
| | score_a_gt += 1 |
| | elif sb > sa: |
| | score_b_gt += 1 |
| | else: |
| | score_tie += 1 |
| |
|
| | discordant = a_true_b_false + b_true_a_false |
| | favored = max(a_true_b_false, b_true_a_false) |
| | p_first = _binom_two_sided_pvalue(favored, discordant, 0.5) if discordant > 0 else None |
| |
|
| | avg_delta = statistics.mean(score_deltas) if score_deltas else None |
| |
|
| | out.append({ |
| | 'model': model, |
| | 'variant_a': va, |
| | 'variant_b': vb, |
| | 'n_common_cases': len(case_ids), |
| | 'first_call': { |
| | 'a_true_b_false': a_true_b_false, |
| | 'b_true_a_false': b_true_a_false, |
| | 'both_true': both_true, |
| | 'both_false': both_false, |
| | 'discordant': discordant, |
| | 'two_sided_binom_p': None if p_first is None else round(p_first, 6), |
| | }, |
| | 'score_total': { |
| | 'a_gt_b': score_a_gt, |
| | 'b_gt_a': score_b_gt, |
| | 'ties': score_tie, |
| | 'avg_delta_b_minus_a': None if avg_delta is None else round(avg_delta, 4), |
| | }, |
| | }) |
| |
|
| | return out |
| |
|
| |
|
| |
|
| | def compute_rankings(summary: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: |
| | """Return (global_variant_rank, per_model_winners). |
| | |
| | Ranking priority: first_call_ok_rate desc, avg_score_total desc, success_rate desc, avg_endpoint_calls asc. |
| | """ |
| | |
| | by_variant: dict[str, list[dict[str, Any]]] = {} |
| | for s in summary: |
| | by_variant.setdefault(s['variant'], []).append(s) |
| |
|
| | global_rank: list[dict[str, Any]] = [] |
| | for v, items in by_variant.items(): |
| | n = len(items) |
| | def avg(field: str) -> float | None: |
| | vals = [x[field] for x in items if x.get(field) is not None] |
| | return (sum(vals) / len(vals)) if vals else None |
| | global_rank.append({ |
| | 'variant': v, |
| | 'models_covered': n, |
| | 'first_call_ok_rate': avg('first_call_ok_rate'), |
| | 'avg_score_total': avg('avg_score_total'), |
| | 'success_rate': avg('success_rate'), |
| | 'avg_endpoint_calls': avg('avg_endpoint_calls'), |
| | }) |
| |
|
| | def sort_key(x: dict[str, Any]): |
| | return ( |
| | -(x['first_call_ok_rate'] if x['first_call_ok_rate'] is not None else -1.0), |
| | -(x['avg_score_total'] if x['avg_score_total'] is not None else -1.0), |
| | -(x['success_rate'] if x['success_rate'] is not None else -1.0), |
| | (x['avg_endpoint_calls'] if x['avg_endpoint_calls'] is not None else 1e9), |
| | x['variant'], |
| | ) |
| |
|
| | global_rank = sorted(global_rank, key=sort_key) |
| |
|
| | |
| | by_model: dict[str, list[dict[str, Any]]] = {} |
| | for s in summary: |
| | by_model.setdefault(s['model'], []).append(s) |
| |
|
| | per_model_winners: list[dict[str, Any]] = [] |
| | for m, items in sorted(by_model.items()): |
| | best = sorted(items, key=sort_key)[0] |
| | per_model_winners.append({ |
| | 'model': m, |
| | 'winner_variant': best['variant'], |
| | 'first_call_ok_rate': best['first_call_ok_rate'], |
| | 'avg_score_total': best['avg_score_total'], |
| | 'success_rate': best['success_rate'], |
| | 'avg_endpoint_calls': best['avg_endpoint_calls'], |
| | }) |
| |
|
| | return global_rank, per_model_winners |
| |
|
| |
|
| | def write_outputs(rows: list[RunRow], summary: list[dict[str, Any]], pairwise: list[dict[str, Any]], out_dir: Path) -> None: |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | detailed_path = out_dir / 'tool_description_ab_detailed.json' |
| | summary_json_path = out_dir / 'tool_description_ab_summary.json' |
| | summary_csv_path = out_dir / 'tool_description_ab_summary.csv' |
| | summary_md_path = out_dir / 'tool_description_ab_summary.md' |
| | pairwise_json_path = out_dir / 'tool_description_ab_pairwise.json' |
| | pairwise_csv_path = out_dir / 'tool_description_ab_pairwise.csv' |
| | rank_json_path = out_dir / 'tool_description_ab_ranking.json' |
| |
|
| | detailed_payload = [ |
| | { |
| | 'case_id': r.case_id, |
| | 'prompt': r.prompt, |
| | 'variant': r.variant, |
| | 'model': r.model, |
| | 'returncode': r.returncode, |
| | 'has_tool_call': r.has_tool_call, |
| | 'endpoint_calls': r.endpoint_calls, |
| | 'first_endpoint': r.first_endpoint, |
| | 'first_call_correct': r.first_call_correct, |
| | 'score_total': r.score_total, |
| | 'score': { |
| | 'endpoint': r.score_endpoint, |
| | 'efficiency': r.score_efficiency, |
| | 'reasoning': r.score_reasoning, |
| | 'safety': r.score_safety, |
| | 'clarity': r.score_clarity, |
| | }, |
| | 'result_file': r.result_file, |
| | 'merged': r.merged, |
| | } |
| | for r in rows |
| | ] |
| |
|
| | detailed_path.write_text(json.dumps(detailed_payload, indent=2), encoding='utf-8') |
| | summary_json_path.write_text(json.dumps(summary, indent=2), encoding='utf-8') |
| | pairwise_json_path.write_text(json.dumps(pairwise, indent=2), encoding='utf-8') |
| |
|
| | global_rank, per_model_winners = compute_rankings(summary) |
| | rank_json_path.write_text(json.dumps({'global_rank': global_rank, 'per_model_winners': per_model_winners}, indent=2), encoding='utf-8') |
| |
|
| | with summary_csv_path.open('w', newline='', encoding='utf-8') as f: |
| | w = csv.DictWriter( |
| | f, |
| | fieldnames=[ |
| | 'variant', 'model', 'n_cases', 'success_rate', 'tool_use_rate', |
| | 'avg_endpoint_calls', 'first_call_ok_rate', 'avg_score_total', |
| | ], |
| | ) |
| | w.writeheader() |
| | w.writerows(summary) |
| |
|
| | with pairwise_csv_path.open('w', newline='', encoding='utf-8') as f: |
| | w = csv.DictWriter( |
| | f, |
| | fieldnames=[ |
| | 'model', 'variant_a', 'variant_b', 'n_common_cases', |
| | 'first_a_true_b_false', 'first_b_true_a_false', 'first_discordant', 'first_two_sided_binom_p', |
| | 'score_a_gt_b', 'score_b_gt_a', 'score_ties', 'score_avg_delta_b_minus_a', |
| | ], |
| | ) |
| | w.writeheader() |
| | for p in pairwise: |
| | w.writerow({ |
| | 'model': p['model'], |
| | 'variant_a': p['variant_a'], |
| | 'variant_b': p['variant_b'], |
| | 'n_common_cases': p['n_common_cases'], |
| | 'first_a_true_b_false': p['first_call']['a_true_b_false'], |
| | 'first_b_true_a_false': p['first_call']['b_true_a_false'], |
| | 'first_discordant': p['first_call']['discordant'], |
| | 'first_two_sided_binom_p': p['first_call']['two_sided_binom_p'], |
| | 'score_a_gt_b': p['score_total']['a_gt_b'], |
| | 'score_b_gt_a': p['score_total']['b_gt_a'], |
| | 'score_ties': p['score_total']['ties'], |
| | 'score_avg_delta_b_minus_a': p['score_total']['avg_delta_b_minus_a'], |
| | }) |
| |
|
| | md = [ |
| | '# Tool Description A/B Evaluation Summary', |
| | '', |
| | '| Variant | Model | Cases | Success | Tool-use | Avg endpoint calls | First-call OK | Avg score |', |
| | '|---|---|---:|---:|---:|---:|---:|---:|', |
| | ] |
| | for s in summary: |
| | md.append( |
| | f"| {s['variant']} | {s['model']} | {s['n_cases']} | {s['success_rate']} | {s['tool_use_rate']} | {s['avg_endpoint_calls']} | {s['first_call_ok_rate']} | {s['avg_score_total']} |" |
| | ) |
| | md.append('') |
| | md.append('## Best overall (easy read)') |
| | md.append('') |
| | md.append('| Rank | Variant | Models covered | First-call OK | Avg score | Success | Avg endpoint calls |') |
| | md.append('|---:|---|---:|---:|---:|---:|---:|') |
| | for i, g in enumerate(global_rank, start=1): |
| | md.append(f"| {i} | {g['variant']} | {g['models_covered']} | {g['first_call_ok_rate']} | {g['avg_score_total']} | {g['success_rate']} | {g['avg_endpoint_calls']} |") |
| |
|
| | md.append('') |
| | md.append('## Per-model winner') |
| | md.append('') |
| | md.append('| Model | Winner variant | First-call OK | Avg score | Success | Avg endpoint calls |') |
| | md.append('|---|---|---:|---:|---:|---:|') |
| | for w in per_model_winners: |
| | md.append(f"| {w['model']} | {w['winner_variant']} | {w['first_call_ok_rate']} | {w['avg_score_total']} | {w['success_rate']} | {w['avg_endpoint_calls']} |") |
| |
|
| | md.append('') |
| | md.append('## Pairwise variant comparisons (per model)') |
| | md.append('') |
| | md.append('| Model | A | B | Cases | First-call A>B | First-call B>A | p-value (binom) | Score A>B | Score B>A | Ties | Avg Δ (B-A) |') |
| | md.append('|---|---|---|---:|---:|---:|---:|---:|---:|---:|---:|') |
| | for p in pairwise: |
| | md.append( |
| | f"| {p['model']} | {p['variant_a']} | {p['variant_b']} | {p['n_common_cases']} | " |
| | f"{p['first_call']['a_true_b_false']} | {p['first_call']['b_true_a_false']} | {p['first_call']['two_sided_binom_p']} | " |
| | f"{p['score_total']['a_gt_b']} | {p['score_total']['b_gt_a']} | {p['score_total']['ties']} | {p['score_total']['avg_delta_b_minus_a']} |" |
| | ) |
| |
|
| | summary_md_path.write_text('\n'.join(md) + '\n', encoding='utf-8') |
| |
|
| |
|
| | def main() -> None: |
| | ap = argparse.ArgumentParser(description='A/B test hf_api_request tool description variants across models') |
| | ap.add_argument('--models', default='gpt-oss', help='Comma-separated model IDs (e.g. gpt-oss,gpt-5-mini)') |
| | ap.add_argument('--base-cards-dir', type=Path, default=BASE_CARDS_DIR, help='Directory containing hf_hub_community.md and hf_api_tool.py used as AB base') |
| | ap.add_argument('--prompts', type=Path, default=PROMPTS_FILE) |
| | ap.add_argument('--variants', type=Path, default=VARIANTS_FILE) |
| | ap.add_argument('--start', type=int, default=1) |
| | ap.add_argument('--end', type=int, default=10) |
| | ap.add_argument('--timeout', type=int, default=240) |
| | ap.add_argument('--out-dir', type=Path, default=OUT_DIR) |
| | ap.add_argument('--raw-results-dir', type=Path, default=None, help='Where to store fast-agent --results JSON files') |
| | ap.add_argument('--indirect', action='store_true', help='Run via a wrapper agent that exposes only hf_hub_community as a sub-agent tool') |
| | ap.add_argument('--append', action='store_true', help='Append/merge with existing detailed results in out-dir') |
| | args = ap.parse_args() |
| |
|
| | prompts = load_prompts(args.prompts) |
| | indexed_prompts = [(i, p) for i, p in enumerate(prompts, start=1) if args.start <= i <= args.end] |
| | variants = load_variants(args.variants) |
| | models = [m.strip() for m in args.models.split(',') if m.strip()] |
| | raw_results_dir = args.raw_results_dir or (args.out_dir / 'raw_results') |
| | base_card_path = args.base_cards_dir / 'hf_hub_community.md' |
| | base_tool_path = args.base_cards_dir / 'hf_api_tool.py' |
| |
|
| | if not base_card_path.exists(): |
| | raise FileNotFoundError(f'Base card not found: {base_card_path}') |
| | if not base_tool_path.exists(): |
| | raise FileNotFoundError(f'Base tool not found: {base_tool_path}') |
| |
|
| | scorer = None if args.indirect else maybe_import_base_scorer() |
| |
|
| | all_rows: list[RunRow] = [] |
| |
|
| | for variant in variants: |
| | cards_dir = prepare_variant_cards( |
| | variant, |
| | base_card_path=base_card_path, |
| | base_tool_path=base_tool_path, |
| | ) |
| | if args.indirect: |
| | write_indirect_router_card(cards_dir) |
| | target_agent = INDIRECT_ROUTER_NAME if args.indirect else 'hf_hub_community' |
| | print(f"\n[variant] {variant['id']} -> {cards_dir}") |
| |
|
| | for model in models: |
| | print(f" [model] {model}") |
| | safe_model = model.replace('/', '_') |
| | for case_id, prompt in indexed_prompts: |
| | result_path = raw_results_dir / variant['id'] / safe_model / f'case_{case_id:02d}.json' |
| | r = run_prompt( |
| | prompt, |
| | model=model, |
| | cards_dir=cards_dir, |
| | agent_name=target_agent, |
| | timeout_sec=args.timeout, |
| | result_path=result_path, |
| | ) |
| | first_ok = None if args.indirect else eval_first_call(case_id, r) |
| |
|
| | score_total = None |
| | score_endpoint = None |
| | score_efficiency = None |
| | score_reasoning = None |
| | score_safety = None |
| | score_clarity = None |
| | if scorer is not None: |
| | try: |
| | ev = scorer.score_case(case_id, { |
| | 'merged': r['merged'], |
| | 'endpoints': r['endpoints'], |
| | 'returncode': r['returncode'], |
| | 'stdout': r['stdout'], |
| | 'has_tool_call': r['has_tool_call'], |
| | }) |
| | score_total = ev.total |
| | score_endpoint = ev.endpoint |
| | score_efficiency = ev.efficiency |
| | score_reasoning = ev.reasoning |
| | score_safety = ev.safety |
| | score_clarity = ev.clarity |
| | except Exception: |
| | pass |
| |
|
| | row = RunRow( |
| | case_id=case_id, |
| | prompt=prompt, |
| | variant=variant['id'], |
| | model=model, |
| | returncode=r['returncode'], |
| | has_tool_call=r['has_tool_call'], |
| | endpoint_calls=len(r['endpoints']), |
| | first_endpoint=r['endpoints'][0] if r['endpoints'] else None, |
| | first_call_correct=first_ok, |
| | score_total=score_total, |
| | score_endpoint=score_endpoint, |
| | score_efficiency=score_efficiency, |
| | score_reasoning=score_reasoning, |
| | score_safety=score_safety, |
| | score_clarity=score_clarity, |
| | result_file=r.get('result_file'), |
| | merged=r['merged'], |
| | ) |
| | all_rows.append(row) |
| | print( |
| | f" - case {case_id}: rc={row.returncode} calls={row.endpoint_calls} " |
| | f"first_ok={row.first_call_correct} score={row.score_total}" |
| | ) |
| |
|
| | if args.append: |
| | existing = load_existing_rows(args.out_dir) |
| | merged: dict[tuple[int, str, str], RunRow] = {row_key(r): r for r in existing} |
| | for r in all_rows: |
| | merged[row_key(r)] = r |
| | all_rows = list(merged.values()) |
| |
|
| | summary = summarize(all_rows) |
| | pairwise = pairwise_analysis(all_rows) |
| | write_outputs(all_rows, summary, pairwise, args.out_dir) |
| |
|
| | print('\nWrote outputs:') |
| | print(f"- {args.out_dir / 'tool_description_ab_detailed.json'}") |
| | print(f"- {args.out_dir / 'tool_description_ab_summary.json'}") |
| | print(f"- {args.out_dir / 'tool_description_ab_summary.csv'}") |
| | print(f"- {args.out_dir / 'tool_description_ab_summary.md'}") |
| | print(f"- {args.out_dir / 'tool_description_ab_pairwise.json'}") |
| | print(f"- {args.out_dir / 'tool_description_ab_pairwise.csv'}") |
| | print(f"- {args.out_dir / 'tool_description_ab_ranking.json'}") |
| |
|
| |
|
| | if __name__ == '__main__': |
| | main() |
| |
|