codemalt / src /distiller /analyze.py
Sarthak
chore: update dependencies and configuration for improved training
7837959
"""
Comprehensive CodeSearchNet Analysis and Reporting Script.
This script provides a complete CodeSearchNet evaluation pipeline that includes:
1. Model evaluation results analysis
2. Peer model comparison analysis
3. Advanced visualizations and charts
4. Leaderboard comparison and ranking analysis
5. Comprehensive README report generation
6. Performance efficiency analysis
7. Language-specific performance analysis
Features:
- CodeSearchNet-style scoring (NDCG@10, MRR, Recall metrics)
- Comparison with peer code-specialized models
- Model efficiency metrics (performance per parameter)
- Interactive visualizations with Plotly and Matplotlib
- Professional charts for README integration
- Statistical analysis of results across programming languages
Usage:
python analyze.py --results-dir results/ --model-name my_model
distiller analyze --results-dir evaluation_results
"""
import json
import logging
import time
from pathlib import Path
from typing import Any
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from .config import directories
# Optional Plotly import with fallback
PLOTLY_AVAILABLE = True
try:
import plotly.graph_objects as go
except ImportError:
PLOTLY_AVAILABLE = False
# Set plotting style
try:
plt.style.use("seaborn-v0_8")
except OSError:
plt.style.use("seaborn") # Fallback for older matplotlib versions
sns.set_palette("husl")
# =============================================================================
# CONFIGURATION
# =============================================================================
# Constants
MIN_SCORES_FOR_STATS = 2
HIGH_PERFORMANCE_THRESHOLD = 0.3
MEDIUM_PERFORMANCE_THRESHOLD = 0.2
# Model Configuration
MODEL_NAME = "code_model2vec_analysis" # Generic name for multi-model analysis
ORIGINAL_MODEL_NAME = "Alibaba-NLP/gte-Qwen2-7B-instruct"
OUTPUT_DIR = Path("analysis_results")
IMAGES_DIR = Path("analysis_charts")
REPORT_FILE = Path("REPORT.md") # Changed from README.md
# Local directories for results - using standardized directories from config
DEFAULT_EVALUATION_DIR = directories.evaluation_results
DEFAULT_BENCHMARK_DIR = directories.benchmark_results
# CodeSearchNet Languages
CODE_LANGUAGES = ["python", "javascript", "java", "php", "ruby", "go"]
# Model name mapping from the default models in evaluate.py and benchmark.py
MODEL_NAME_MAPPING = {
# File names to display names and HuggingFace links
"all-MiniLM-L6-v2": {
"name": "sentence-transformers/all-MiniLM-L6-v2",
"link": "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2",
},
"all-mpnet-base-v2": {
"name": "sentence-transformers/all-mpnet-base-v2",
"link": "https://huggingface.co/sentence-transformers/all-mpnet-base-v2",
},
"paraphrase-MiniLM-L6-v2": {
"name": "sentence-transformers/paraphrase-MiniLM-L6-v2",
"link": "https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L6-v2",
},
"codebert-base": {"name": "microsoft/codebert-base", "link": "https://huggingface.co/microsoft/codebert-base"},
"graphcodebert-base": {
"name": "microsoft/graphcodebert-base",
"link": "https://huggingface.co/microsoft/graphcodebert-base",
},
"CodeBERTa-small-v1": {
"name": "huggingface/CodeBERTa-small-v1",
"link": "https://huggingface.co/huggingface/CodeBERTa-small-v1",
},
"all-MiniLM-L12-v2": {
"name": "sentence-transformers/all-MiniLM-L12-v2",
"link": "https://huggingface.co/sentence-transformers/all-MiniLM-L12-v2",
},
"potion-base-8M": {"name": "minishlab/potion-base-8M", "link": "https://huggingface.co/minishlab/potion-base-8M"},
"potion-retrieval-32M": {
"name": "minishlab/potion-retrieval-32M",
"link": "https://huggingface.co/minishlab/potion-retrieval-32M",
},
"codet5-base": {"name": "Salesforce/codet5-base", "link": "https://huggingface.co/Salesforce/codet5-base"},
"gte-Qwen2-1.5B-instruct": {
"name": "Alibaba-NLP/gte-Qwen2-1.5B-instruct",
"link": "https://huggingface.co/Alibaba-NLP/gte-Qwen2-1.5B-instruct",
},
"bge-m3": {"name": "BAAI/bge-m3", "link": "https://huggingface.co/BAAI/bge-m3"},
"jina-embeddings-v3": {
"name": "jinaai/jina-embeddings-v3",
"link": "https://huggingface.co/jinaai/jina-embeddings-v3",
},
"nomic-embed-text-v2-moe": {
"name": "nomic-ai/nomic-embed-text-v2-moe",
"link": "https://huggingface.co/nomic-ai/nomic-embed-text-v2-moe",
},
"Qodo-Embed-1-1.5B": {"name": "Qodo/Qodo-Embed-1-1.5B", "link": "https://huggingface.co/Qodo/Qodo-Embed-1-1.5B"},
"Reason-ModernColBERT": {
"name": "lightonai/Reason-ModernColBERT",
"link": "https://huggingface.co/lightonai/Reason-ModernColBERT",
},
"Linq-Embed-Mistral": {
"name": "Linq-AI-Research/Linq-Embed-Mistral",
"link": "https://huggingface.co/Linq-AI-Research/Linq-Embed-Mistral",
},
"bge-code-v1": {"name": "BAAI/bge-code-v1", "link": "https://huggingface.co/BAAI/bge-code-v1"},
"SFR-Embedding-Code-2B_R": {
"name": "Salesforce/SFR-Embedding-Code-2B_R",
"link": "https://huggingface.co/Salesforce/SFR-Embedding-Code-2B_R",
},
}
# Reverse mapping for lookups - using just the names
DISPLAY_NAME_TO_FILE = {v["name"]: k for k, v in MODEL_NAME_MAPPING.items()}
# Peer models for comparison (code-specialized models)
PEER_MODELS = {
"sentence-transformers/all-MiniLM-L6-v2": {
"overall_ndcg": 0.25,
"type": "General",
"link": "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2",
},
"microsoft/codebert-base": {
"overall_ndcg": 0.32,
"type": "Code-Specific",
"link": "https://huggingface.co/microsoft/codebert-base",
},
"microsoft/graphcodebert-base": {
"overall_ndcg": 0.35,
"type": "Code-Specific",
"link": "https://huggingface.co/microsoft/graphcodebert-base",
},
"huggingface/CodeBERTa-small-v1": {
"overall_ndcg": 0.28,
"type": "Code-Specific",
"link": "https://huggingface.co/huggingface/CodeBERTa-small-v1",
},
"sentence-transformers/all-mpnet-base-v2": {
"overall_ndcg": 0.27,
"type": "General",
"link": "https://huggingface.co/sentence-transformers/all-mpnet-base-v2",
},
}
# Model specifications for efficiency analysis
MODEL_SPECS = {
"sentence-transformers/all-MiniLM-L6-v2": {
"parameters": 22.7,
"size_mb": 90,
"link": "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2",
},
"microsoft/codebert-base": {
"parameters": 125.0,
"size_mb": 500,
"link": "https://huggingface.co/microsoft/codebert-base",
},
"microsoft/graphcodebert-base": {
"parameters": 125.0,
"size_mb": 500,
"link": "https://huggingface.co/microsoft/graphcodebert-base",
},
"huggingface/CodeBERTa-small-v1": {
"parameters": 84.0,
"size_mb": 340,
"link": "https://huggingface.co/huggingface/CodeBERTa-small-v1",
},
"sentence-transformers/all-mpnet-base-v2": {
"parameters": 109.0,
"size_mb": 440,
"link": "https://huggingface.co/sentence-transformers/all-mpnet-base-v2",
},
"Alibaba-NLP/gte-Qwen2-1.5B-instruct": {
"parameters": 1500.0,
"size_mb": 3000,
"link": "https://huggingface.co/Alibaba-NLP/gte-Qwen2-1.5B-instruct",
},
}
# Distilled model specifications
DISTILLED_MODEL_SPECS = {
"parameters": 39.0, # Model2Vec parameters
"size_mb": 149.0, # Actual model size
"dimensions": 256, # Model2Vec dimensions
"original_dimensions": 3584,
"distillation_method": "Model2Vec",
"training_dataset": "CodeSearchNet",
}
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def setup_directories(base_path: Path | None = None) -> tuple[Path, Path, Path]:
"""Create necessary directories and return their paths."""
if base_path:
output_dir = base_path / "analysis_results"
images_dir = base_path / "analysis_results" / "charts"
reports_dir = base_path / "analysis_results" / "reports"
else:
output_dir = Path() # Use current directory
images_dir = IMAGES_DIR # Use analysis_charts
reports_dir = Path() # Use current directory for reports
# Only create directories that we actually use
images_dir.mkdir(parents=True, exist_ok=True)
return output_dir, images_dir, reports_dir
def extract_model_name_from_filename(filename: str) -> str:
"""Extract and map model name from filename."""
# Remove prefixes and extensions
name = filename.replace("codesearchnet_eval_", "").replace("benchmark_", "").replace(".json", "")
# Check if it's in our mapping
if name in MODEL_NAME_MAPPING:
return MODEL_NAME_MAPPING[name]["name"]
# Try to find partial matches
for file_key, model_info in MODEL_NAME_MAPPING.items():
if file_key in name or name in file_key:
return model_info["name"]
# If no mapping found, return the cleaned name
return name
def get_model_link(model_name: str) -> str:
"""Get HuggingFace link for a model."""
# First try direct lookup by file key
for model_info in MODEL_NAME_MAPPING.values():
if model_info["name"] == model_name:
return model_info["link"]
# Try partial matches
for model_info in MODEL_NAME_MAPPING.values():
if model_name.lower() in model_info["name"].lower() or model_info["name"].lower() in model_name.lower():
return model_info["link"]
# If no mapping found, construct link from model name
if "/" in model_name:
return f"https://huggingface.co/{model_name}"
return ""
def format_model_with_link(model_name: str) -> str:
"""Format model name with markdown link."""
link = get_model_link(model_name)
if link:
return f"[{model_name}]({link})"
return model_name
def get_teacher_model_info(model_display_name: str) -> tuple[str, str]:
"""Extract teacher model name and link from distilled model display name."""
# Mapping from model display patterns to teacher models
teacher_mapping = {
"all_MiniLM_L6_v2": (
"sentence-transformers/all-MiniLM-L6-v2",
"https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2",
),
"all_mpnet_base_v2": (
"sentence-transformers/all-mpnet-base-v2",
"https://huggingface.co/sentence-transformers/all-mpnet-base-v2",
),
"paraphrase_MiniLM_L6_v2": (
"sentence-transformers/paraphrase-MiniLM-L6-v2",
"https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L6-v2",
),
"codebert_base": ("microsoft/codebert-base", "https://huggingface.co/microsoft/codebert-base"),
"graphcodebert_base": ("microsoft/graphcodebert-base", "https://huggingface.co/microsoft/graphcodebert-base"),
"gte_Qwen2_1.5B_instruct": (
"Alibaba-NLP/gte-Qwen2-1.5B-instruct",
"https://huggingface.co/Alibaba-NLP/gte-Qwen2-1.5B-instruct",
),
"bge_m3": ("BAAI/bge-m3", "https://huggingface.co/BAAI/bge-m3"),
"jina_embeddings_v2_base_code": (
"jina-embeddings-v2-base-code",
"https://huggingface.co/jina-embeddings-v2-base-code",
),
"jina_embeddings_v3": ("jinaai/jina-embeddings-v3", "https://huggingface.co/jinaai/jina-embeddings-v3"),
"nomic_embed_text_v2_moe": (
"nomic-ai/nomic-embed-text-v2-moe",
"https://huggingface.co/nomic-ai/nomic-embed-text-v2-moe",
),
"Qodo_Embed_1_1.5B": ("Qodo/Qodo-Embed-1-1.5B", "https://huggingface.co/Qodo/Qodo-Embed-1-1.5B"),
"Reason_ModernColBERT": (
"lightonai/Reason-ModernColBERT",
"https://huggingface.co/lightonai/Reason-ModernColBERT",
),
"Linq_Embed_Mistral": (
"Linq-AI-Research/Linq-Embed-Mistral",
"https://huggingface.co/Linq-AI-Research/Linq-Embed-Mistral",
),
"bge_code_v1": ("BAAI/bge-code-v1", "https://huggingface.co/BAAI/bge-code-v1"),
"SFR_Embedding_Code_2B_R": (
"Salesforce/SFR-Embedding-Code-2B_R",
"https://huggingface.co/Salesforce/SFR-Embedding-Code-2B_R",
),
}
for pattern, (teacher_name, teacher_link) in teacher_mapping.items():
if pattern in model_display_name:
return teacher_name, teacher_link
return "Unknown", ""
class CodeSearchNetAnalyzer:
"""Analyzer for CodeSearchNet evaluation results and performance benchmarks."""
def __init__(
self,
results_dir: str | None = None,
benchmark_dir: str | None = None,
images_dir: Path | None = None,
) -> None:
"""Initialize analyzer with results directories."""
self.results_dir = Path(results_dir) if results_dir else Path(DEFAULT_EVALUATION_DIR)
self.benchmark_dir = Path(benchmark_dir) if benchmark_dir else Path(DEFAULT_BENCHMARK_DIR)
self.images_dir = images_dir or IMAGES_DIR
self.results: list[dict[str, Any]] = []
self.benchmark_results: list[dict[str, Any]] = []
self.comparison_df: pd.DataFrame | None = None
self.benchmark_df: pd.DataFrame | None = None
self.model_specs: dict[str, dict[str, Any]] = {} # Store actual model specifications
def load_benchmark_results(self) -> None:
"""Load benchmark results from comprehensive evaluation files."""
logger.info("πŸ“Š Loading benchmark results from comprehensive evaluations...")
if not self.results_dir.exists():
logger.warning(f"Evaluation directory not found: {self.results_dir}")
return
logger.info(f"πŸ” Searching for comprehensive evaluation files in: {self.results_dir}")
# Look for both new comprehensive format and legacy formats
comprehensive_files = list(self.results_dir.glob("comprehensive_eval_*.json"))
legacy_files = list(self.results_dir.glob("codesearchnet_eval_*.json"))
all_files = comprehensive_files + legacy_files
logger.info(
f"πŸ“ Found {len(all_files)} evaluation files ({len(comprehensive_files)} comprehensive, {len(legacy_files)} legacy)"
)
for eval_file_path in all_files:
try:
logger.info(f"πŸ“– Loading: {eval_file_path.name}")
with eval_file_path.open() as f:
data = json.load(f)
if data is not None:
if not isinstance(data, dict):
logger.warning(f"⚠️ Skipping {eval_file_path.name} (not a dict)")
continue
# Extract benchmark data if available
benchmark_data = self._extract_benchmark_data(data, eval_file_path)
if benchmark_data:
self.benchmark_results.append(benchmark_data)
logger.info(f"βœ… Successfully loaded benchmark data: {benchmark_data['model_name']}")
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"❌ Failed to load {eval_file_path}: {e}")
logger.info(f"πŸ“Š Total benchmark results loaded: {len(self.benchmark_results)}")
if self.benchmark_results:
model_names = [r.get("model_name", "Unknown") for r in self.benchmark_results]
logger.info(f"🎯 Benchmark models found: {', '.join(model_names)}")
self._create_benchmark_dataframe()
def _extract_benchmark_data(self, data: dict, file_path: Path) -> dict[str, Any] | None:
"""Extract benchmark data from comprehensive evaluation results."""
# Check if this evaluation contains benchmark data
if data.get("benchmark_skipped", False):
return None
# Check for benchmark fields
if not any(key in data for key in ["size_metrics", "speed_benchmarks", "memory_benchmarks", "cpu_vs_gpu"]):
return None
# Extract model name
original_name = data.get("model_name") or "Unknown"
mapped_name = extract_model_name_from_filename(
file_path.stem.replace("comprehensive_eval_", "").replace("codesearchnet_eval_", "")
)
# Create benchmark result structure
result: dict[str, Any] = {
"model_name": mapped_name,
"original_model_name": original_name,
"size_metrics": data.get("size_metrics", {}),
"speed_benchmarks": data.get("speed_benchmarks", {}),
"memory_benchmarks": data.get("memory_benchmarks", {}),
"cpu_vs_gpu": data.get("cpu_vs_gpu", {}),
}
return result
def _create_benchmark_dataframe(self) -> None:
"""Create benchmark comparison DataFrame from results."""
if not self.benchmark_results:
return
benchmark_data = []
for result in self.benchmark_results:
model_name = result.get("model_name", "Unknown")
size_metrics = result.get("size_metrics", {})
speed_benchmarks = result.get("speed_benchmarks", {})
memory_benchmarks = result.get("memory_benchmarks", {})
cpu_vs_gpu = result.get("cpu_vs_gpu", {})
# Extract key metrics
row = {
"Model": model_name,
"Disk_Size_MB": size_metrics.get("disk_size_mb", 0),
"Parameters_M": size_metrics.get("parameters_millions", 0),
"Embedding_Dim": size_metrics.get("embedding_dim", 0),
"RAM_Usage_MB": size_metrics.get("ram_usage_mb", 0),
"GPU_Memory_MB": size_metrics.get("gpu_memory_mb", 0),
}
# Speed metrics (medium texts, batch 32)
if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]:
batch_32 = speed_benchmarks["medium"]["batch_32"]
row.update(
{
"Throughput_TextsPerSec": batch_32.get("texts_per_second", 0),
"Latency_MsPerText": batch_32.get("time_per_text_ms", 0),
"TokenSpeed_TokensPerSec": batch_32.get("tokens_per_second", 0),
}
)
# Memory scaling (batch 32)
if "batch_32" in memory_benchmarks:
batch_32_mem = memory_benchmarks["batch_32"]
if not batch_32_mem.get("oom", False) and "error" not in batch_32_mem:
row.update(
{
"Memory_Used_MB": batch_32_mem.get("memory_used_mb", 0),
"Memory_Per_Text_MB": batch_32_mem.get("memory_per_text_mb", 0),
}
)
# CPU vs GPU comparison
for device, metrics in cpu_vs_gpu.items():
if isinstance(metrics, dict) and "error" not in metrics:
device_key = f"{device.upper()}_TextsPerSec"
row[device_key] = metrics.get("texts_per_second", 0)
benchmark_data.append(row)
self.benchmark_df = pd.DataFrame(benchmark_data)
def analyze_our_model_specifications(self) -> None:
"""Analyze actual model specifications for our distilled models."""
logger.info("πŸ” Analyzing model specifications for our distilled models...")
# Look for our models in the code_model2vec/final directory
final_models_dir = Path("code_model2vec/final")
if not final_models_dir.exists():
logger.warning(f"Final models directory not found: {final_models_dir}")
return
# Find all our model directories
our_model_dirs = [
model_dir
for model_dir in final_models_dir.iterdir()
if model_dir.is_dir() and "code_model2vec" in model_dir.name
]
logger.info(f"πŸ“ Found {len(our_model_dirs)} distilled model directories")
for model_dir in our_model_dirs:
model_name = model_dir.name
logger.info(f"πŸ“Š Analyzing model: {model_name}")
try:
# Try to load the model and get specifications
from distiller.model2vec import StaticModel
model = StaticModel.from_pretrained(str(model_dir))
# Get model specifications
vocab_size = len(model.tokens)
embedding_dim = model.dim
total_params = vocab_size * embedding_dim
# Get file size information
model_file = model_dir / "model.safetensors"
disk_size_mb: float = 0.0
if model_file.exists():
disk_size_mb = float(model_file.stat().st_size / (1024 * 1024)) # Convert to MB
# Store specifications
self.model_specs[model_name] = {
"vocabulary_size": vocab_size,
"embedding_dimensions": embedding_dim,
"total_parameters": total_params,
"parameters_millions": total_params / 1_000_000,
"disk_size_mb": disk_size_mb,
"model_path": str(model_dir),
"analysis_successful": True,
}
logger.info(
f"βœ… {model_name}: {vocab_size:,} vocab, {embedding_dim} dims, {total_params:,} params ({total_params / 1_000_000:.1f}M)"
)
except Exception as e:
logger.warning(f"❌ Failed to analyze {model_name}: {e}")
self.model_specs[model_name] = {
"analysis_successful": False,
"error": str(e),
"model_path": str(model_dir),
}
logger.info(
f"πŸ“Š Successfully analyzed {len([s for s in self.model_specs.values() if s.get('analysis_successful', False)])} models"
)
def load_results(self) -> None:
"""Load evaluation results from local directory."""
logger.info("πŸ” Loading evaluation results...")
if not self.results_dir.exists():
logger.warning(f"Evaluation directory not found: {self.results_dir}")
return
logger.info(f"πŸ” Searching for evaluation files in: {self.results_dir}")
# Look for both new comprehensive format and legacy formats
comprehensive_files = list(self.results_dir.glob("comprehensive_eval_*.json"))
legacy_files = list(self.results_dir.glob("codesearchnet_eval_*.json"))
all_files = comprehensive_files + legacy_files
logger.info(
f"πŸ“ Found {len(all_files)} evaluation files ({len(comprehensive_files)} comprehensive, {len(legacy_files)} legacy)"
)
for json_file in all_files:
try:
logger.info(f"πŸ“– Loading: {json_file.name}")
with json_file.open() as f:
data = json.load(f)
if data is not None:
if not isinstance(data, dict):
logger.warning(f"⚠️ Skipping {json_file.name} (not a dict)")
continue
# Normalize data format for analysis
normalized_data = self._normalize_evaluation_data(data, json_file)
self.results.append(normalized_data)
logger.info(f"βœ… Successfully loaded: {normalized_data['model_name']}")
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"❌ Failed to load {json_file}: {e}")
logger.info(f"πŸ“Š Total loaded: {len(self.results)} model results")
if self.results:
model_names = [r.get("model_name", "Unknown") for r in self.results]
logger.info(f"🎯 Models found: {', '.join(model_names)}")
self._create_comparison_dataframe()
# Also load benchmark results
self.load_benchmark_results()
# Analyze actual model specifications for our models
self.analyze_our_model_specifications()
def _normalize_evaluation_data(self, data: dict, file_path: Path) -> dict[str, Any]:
"""Normalize evaluation data to consistent format for analysis."""
# Extract model name
original_name = data.get("model_name", "Unknown")
file_stem = file_path.stem.replace("comprehensive_eval_", "").replace("codesearchnet_eval_", "")
mapped_name = extract_model_name_from_filename(file_stem)
# Handle comprehensive format (new)
if "codesearch_overall" in data and "codesearch_languages" in data:
result = {
"model_name": mapped_name,
"original_model_name": original_name,
"overall": data.get("codesearch_overall", {}),
"languages": data.get("codesearch_languages", {}),
}
# Handle legacy format (old codesearchnet_eval files)
else:
result = {
"model_name": mapped_name,
"original_model_name": original_name,
"overall": data.get("overall", {}),
"languages": data.get("languages", {}),
}
return result
def _create_comparison_dataframe(self) -> None:
"""Create comparison DataFrame from results."""
if not self.results:
return
comparison_data = []
for result in self.results:
overall = result.get("overall", {})
row = {
"Model": result["model_name"],
"MRR": overall.get("mrr", 0),
"NDCG@1": overall.get("ndcg@1", 0),
"NDCG@5": overall.get("ndcg@5", 0),
"NDCG@10": overall.get("ndcg@10", 0),
"Recall@1": overall.get("recall@1", 0),
"Recall@5": overall.get("recall@5", 0),
"Recall@10": overall.get("recall@10", 0),
"Mean_Rank": overall.get("mean_rank", 0),
"Median_Rank": overall.get("median_rank", 0),
}
comparison_data.append(row)
self.comparison_df = pd.DataFrame(comparison_data)
if not self.comparison_df.empty:
self.comparison_df = self.comparison_df.sort_values("NDCG@10", ascending=False)
def print_summary(self) -> None:
"""Print summary of results."""
if not self.results:
logger.warning("No results to summarize")
return
print(f"\n{'=' * 60}")
print("CodeSearchNet Evaluation Summary")
print(f"{'=' * 60}")
print(f"Total models evaluated: {len(self.results)}")
if self.comparison_df is not None and not self.comparison_df.empty:
print(f"\nTop performing model: {self.comparison_df.iloc[0]['Model']}")
print(f"Best NDCG@10: {self.comparison_df.iloc[0]['NDCG@10']:.4f}")
print(f"Best MRR: {self.comparison_df['MRR'].max():.4f}")
print(f"\nEvaluated languages: {', '.join(CODE_LANGUAGES)}")
# Also print benchmark summary if available
if self.benchmark_results:
print(f"\n{'=' * 60}")
print("Performance Benchmark Summary")
print(f"{'=' * 60}")
print(f"Total models benchmarked: {len(self.benchmark_results)}")
if self.benchmark_df is not None and not self.benchmark_df.empty:
# Safely get fastest and smallest models
fastest_model = "N/A"
smallest_model = "N/A"
if "Throughput_TextsPerSec" in self.benchmark_df.columns:
fastest_idx = self.benchmark_df["Throughput_TextsPerSec"].idxmax()
fastest_model = str(self.benchmark_df.loc[fastest_idx, "Model"])
if "Disk_Size_MB" in self.benchmark_df.columns:
smallest_idx = self.benchmark_df["Disk_Size_MB"].idxmin()
smallest_model = str(self.benchmark_df.loc[smallest_idx, "Model"])
print(f"\nFastest model: {fastest_model}")
print(f"Smallest model: {smallest_model}")
def analyze_language_performance(self) -> None:
"""Analyze performance across programming languages."""
if not self.results:
return
print(f"\n{'=' * 60}")
print("Language-Specific Performance Analysis")
print(f"{'=' * 60}")
for result in self.results:
model_name = result["model_name"]
print(f"\nModel: {model_name}")
print("-" * 40)
languages = result.get("languages", {})
lang_data = []
for lang, lang_results in languages.items():
metrics = lang_results.get("metrics", {})
lang_data.append(
{
"Language": lang,
"NDCG@10": metrics.get("ndcg@10", 0),
"MRR": metrics.get("mrr", 0),
"Recall@5": metrics.get("recall@5", 0),
"Queries": lang_results.get("num_queries", 0),
}
)
if lang_data:
lang_df = pd.DataFrame(lang_data)
print(lang_df.to_string(index=False, float_format="%.4f"))
print(f"\nBest language: {lang_df.loc[lang_df['NDCG@10'].idxmax(), 'Language']}")
print(f"Average NDCG@10: {lang_df['NDCG@10'].mean():.4f}")
print(f"Average queries per language: {lang_df['Queries'].mean():.0f}")
def analyze_benchmark_performance(self) -> None:
"""Analyze and print benchmark performance summary."""
if not self.benchmark_results:
logger.warning("No benchmark results to analyze")
return
print(f"\n{'=' * 60}")
print("Performance Benchmark Analysis")
print(f"{'=' * 60}")
for result in self.benchmark_results:
model_name = result.get("model_name", "Unknown")
print(f"\nModel: {model_name}")
print("-" * 40)
# Size metrics
size_metrics = result.get("size_metrics", {})
if size_metrics:
print("πŸ“ Model Size:")
print(f" Disk Size: {size_metrics.get('disk_size_mb', 0):.1f} MB")
if "parameters_millions" in size_metrics:
print(f" Parameters: {size_metrics['parameters_millions']:.1f}M")
if "embedding_dim" in size_metrics:
print(f" Embedding Dimension: {size_metrics['embedding_dim']}")
# Speed metrics
speed_benchmarks = result.get("speed_benchmarks", {})
if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]:
batch_32 = speed_benchmarks["medium"]["batch_32"]
print("⚑ Performance (Batch 32, Medium Texts):")
print(f" Throughput: {batch_32.get('texts_per_second', 0):.1f} texts/sec")
print(f" Latency: {batch_32.get('time_per_text_ms', 0):.1f} ms/text")
print(f" Token Speed: {batch_32.get('tokens_per_second', 0):.0f} tokens/sec")
# CPU vs GPU
cpu_vs_gpu = result.get("cpu_vs_gpu", {})
if cpu_vs_gpu:
print("πŸ–₯️ CPU vs GPU:")
for device, metrics in cpu_vs_gpu.items():
if isinstance(metrics, dict) and "error" not in metrics:
print(f" {device.upper()}: {metrics.get('texts_per_second', 0):.1f} texts/sec")
# Memory efficiency
memory_benchmarks = result.get("memory_benchmarks", {})
if "batch_32" in memory_benchmarks:
batch_32_mem = memory_benchmarks["batch_32"]
if not batch_32_mem.get("oom", False) and "error" not in batch_32_mem:
print("πŸ’Ύ Memory Usage (Batch 32):")
print(f" Total: {batch_32_mem.get('memory_used_mb', 0):.1f} MB")
print(f" Per Text: {batch_32_mem.get('memory_per_text_mb', 0):.2f} MB")
def create_performance_radar_chart(self, model_name: str, language_scores: dict[str, float]) -> str:
"""Create radar chart showing performance across languages."""
if not PLOTLY_AVAILABLE:
logger.warning("Plotly not available, skipping radar chart")
return ""
languages = list(language_scores.keys())
scores = list(language_scores.values())
if not languages:
return ""
# Close the radar chart
languages_closed = [*languages, languages[0]]
scores_closed = [*scores, scores[0]]
fig = go.Figure()
fig.add_trace(
go.Scatterpolar(
r=scores_closed,
theta=languages_closed,
fill="toself",
name=model_name,
line_color="rgb(67, 147, 195)",
fillcolor="rgba(67, 147, 195, 0.3)",
)
)
fig.update_layout(
polar={"radialaxis": {"visible": True, "range": [0, max(scores) * 1.1]}},
showlegend=True,
title=f"CodeSearchNet Performance by Language: {model_name}",
width=800,
height=600,
)
static_path = self.images_dir / "code_performance_radar.png"
try:
fig.write_image(str(static_path), width=800, height=600, scale=2)
return str(static_path)
except Exception as e:
logger.warning(f"Could not create static image: {e}")
return ""
def create_comparative_radar_chart(self, simplified_models: list, peer_models: list) -> str:
"""Create comparative radar chart between best distilled model and top peer models."""
if not PLOTLY_AVAILABLE:
logger.warning("Plotly not available, skipping comparative radar chart")
return ""
if not simplified_models:
return ""
# Get the best simplified model
best_simplified = max(simplified_models, key=lambda x: x.get("overall", {}).get("ndcg@10", 0))
# Get top 3 peer models by performance
peer_models_sorted = sorted(peer_models, key=lambda x: x.get("overall", {}).get("ndcg@10", 0), reverse=True)
top_peers = peer_models_sorted[:3]
models_to_compare = [best_simplified, *top_peers]
fig = go.Figure()
# Define colors for each model
colors = ["rgb(255, 99, 132)", "rgb(54, 162, 235)", "rgb(255, 205, 86)", "rgb(75, 192, 192)"]
# Collect all scores to determine the appropriate range
all_scores = []
for i, model_result in enumerate(models_to_compare):
model_name = model_result["model_name"]
languages = model_result.get("languages", {})
# Calculate language scores
language_scores = {}
for lang, lang_data in languages.items():
metrics = lang_data.get("metrics", {})
language_scores[lang.title()] = metrics.get("ndcg@10", 0)
if language_scores:
languages_list = list(language_scores.keys())
scores_list = list(language_scores.values())
all_scores.extend(scores_list) # Collect scores for range calculation
# Close the radar chart
languages_closed = [*languages_list, languages_list[0]]
scores_closed = [*scores_list, scores_list[0]]
# Determine line style - solid for best distilled, dash for peers
line_dash = "solid" if i == 0 else "dash"
line_width = 3 if i == 0 else 2
fig.add_trace(
go.Scatterpolar(
r=scores_closed,
theta=languages_closed,
fill="toself" if i == 0 else "none",
name=model_name,
line={"color": colors[i % len(colors)], "dash": line_dash, "width": line_width},
fillcolor=f"rgba{colors[i % len(colors)][3:-1]}, 0.2)" if i == 0 else None,
)
)
# Calculate dynamic range based on actual data
if all_scores:
max_score = max(all_scores)
# Set range to slightly above the maximum score with some padding
range_max = min(1.0, max_score * 1.1) # Cap at 1.0 since NDCG@10 max is 1.0
else:
range_max = 1.0 # Default fallback
fig.update_layout(
polar={"radialaxis": {"visible": True, "range": [0, range_max]}},
showlegend=True,
title="Model Comparison: Best Distilled vs Top Peer Models",
width=900,
height=700,
)
static_path = self.images_dir / "comparative_radar.png"
try:
fig.write_image(str(static_path), width=900, height=700, scale=2)
return str(static_path)
except Exception as e:
logger.warning(f"Could not create comparative radar chart: {e}")
return ""
def create_individual_radar_charts(self, simplified_models: list) -> dict[str, str]:
"""Create individual radar charts for all simplified models."""
radar_charts = {}
for result in simplified_models:
model_name = result["model_name"]
model_languages = result.get("languages", {})
model_language_scores = {}
for lang, lang_data in model_languages.items():
metrics = lang_data.get("metrics", {})
model_language_scores[lang.title()] = metrics.get("ndcg@10", 0)
if model_language_scores:
# Create unique filename for each model
safe_model_name = "".join(c for c in model_name if c.isalnum() or c in ("-", "_")).rstrip()
radar_chart_path = self.create_performance_radar_chart_individual(
model_name, model_language_scores, safe_model_name
)
if radar_chart_path:
radar_charts[model_name] = radar_chart_path
return radar_charts
def create_performance_radar_chart_individual(
self, model_name: str, language_scores: dict[str, float], filename_suffix: str
) -> str:
"""Create radar chart for individual model with unique filename."""
if not PLOTLY_AVAILABLE:
logger.warning("Plotly not available, skipping radar chart")
return ""
languages = list(language_scores.keys())
scores = list(language_scores.values())
if not languages:
return ""
# Close the radar chart
languages_closed = [*languages, languages[0]]
scores_closed = [*scores, scores[0]]
fig = go.Figure()
fig.add_trace(
go.Scatterpolar(
r=scores_closed,
theta=languages_closed,
fill="toself",
name=model_name,
line_color="rgb(67, 147, 195)",
fillcolor="rgba(67, 147, 195, 0.3)",
)
)
fig.update_layout(
polar={"radialaxis": {"visible": True, "range": [0, max(scores) * 1.1]}},
showlegend=True,
title=f"CodeSearchNet Performance by Language: {model_name}",
width=800,
height=600,
)
static_path = self.images_dir / f"radar_{filename_suffix}.png"
try:
fig.write_image(str(static_path), width=800, height=600, scale=2)
return str(static_path)
except Exception as e:
logger.warning(f"Could not create static image for {model_name}: {e}")
return ""
def plot_model_comparison(self, save_path: str | None = None) -> str:
"""Create comparison plots for models."""
if self.comparison_df is None or self.comparison_df.empty:
logger.warning("No comparison data available for plotting")
return ""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle("CodeSearchNet Model Comparison", fontsize=16, fontweight="bold")
# NDCG@10 comparison
axes[0, 0].barh(self.comparison_df["Model"], self.comparison_df["NDCG@10"])
axes[0, 0].set_title("NDCG@10 Comparison")
axes[0, 0].set_xlabel("NDCG@10")
# MRR comparison
axes[0, 1].barh(self.comparison_df["Model"], self.comparison_df["MRR"])
axes[0, 1].set_title("Mean Reciprocal Rank (MRR)")
axes[0, 1].set_xlabel("MRR")
# Recall@5 comparison
axes[1, 0].barh(self.comparison_df["Model"], self.comparison_df["Recall@5"])
axes[1, 0].set_title("Recall@5")
axes[1, 0].set_xlabel("Recall@5")
# Mean Rank comparison (lower is better)
axes[1, 1].barh(self.comparison_df["Model"], self.comparison_df["Mean_Rank"])
axes[1, 1].set_title("Mean Rank (lower is better)")
axes[1, 1].set_xlabel("Mean Rank")
plt.tight_layout()
output_path = save_path or str(self.images_dir / "model_comparison.png")
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path
def plot_language_heatmap(self, save_path: str | None = None) -> str:
"""Create a heatmap of performance across languages."""
if not self.results:
return ""
# Prepare data for heatmap
heatmap_data = []
for result in self.results:
model_name = result["model_name"]
languages = result.get("languages", {})
row = {"Model": model_name}
for lang in CODE_LANGUAGES:
if lang in languages:
metrics = languages[lang].get("metrics", {})
row[lang.title()] = metrics.get("ndcg@10", 0)
else:
row[lang.title()] = 0
heatmap_data.append(row)
if not heatmap_data:
return ""
df = pd.DataFrame(heatmap_data).set_index("Model")
plt.figure(figsize=(12, 8))
sns.heatmap(
df,
annot=True,
fmt=".3f",
cmap="RdYlBu_r",
center=0.2,
vmin=0,
vmax=df.to_numpy().max(),
cbar_kws={"label": "NDCG@10 Score"},
)
plt.title(
"CodeSearchNet Performance Heatmap by Language",
fontsize=16,
fontweight="bold",
)
plt.xlabel("Programming Language", fontsize=12)
plt.ylabel("Model", fontsize=12)
plt.tight_layout()
output_path = save_path or str(self.images_dir / "language_heatmap.png")
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path
def plot_benchmark_performance(self, save_path: str | None = None) -> str:
"""Create comprehensive benchmark performance plots."""
if not self.benchmark_results:
logger.warning("No benchmark data available for plotting")
return ""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle("Performance Benchmark Analysis", fontsize=16, fontweight="bold")
# 1. Model Size Comparison
if self.benchmark_df is not None and "Disk_Size_MB" in self.benchmark_df.columns:
axes[0, 0].barh(self.benchmark_df["Model"], self.benchmark_df["Disk_Size_MB"])
axes[0, 0].set_title("Model Size (MB)")
axes[0, 0].set_xlabel("Size (MB)")
# 2. Inference Throughput
if self.benchmark_df is not None and "Throughput_TextsPerSec" in self.benchmark_df.columns:
axes[0, 1].barh(self.benchmark_df["Model"], self.benchmark_df["Throughput_TextsPerSec"])
axes[0, 1].set_title("Inference Throughput")
axes[0, 1].set_xlabel("Texts/Second")
# 3. Memory Usage
if self.benchmark_df is not None and "Memory_Used_MB" in self.benchmark_df.columns:
axes[0, 2].barh(self.benchmark_df["Model"], self.benchmark_df["Memory_Used_MB"])
axes[0, 2].set_title("Memory Usage (Batch 32)")
axes[0, 2].set_xlabel("Memory (MB)")
# 4. Latency Comparison
if self.benchmark_df is not None and "Latency_MsPerText" in self.benchmark_df.columns:
axes[1, 0].barh(self.benchmark_df["Model"], self.benchmark_df["Latency_MsPerText"])
axes[1, 0].set_title("Inference Latency")
axes[1, 0].set_xlabel("Milliseconds/Text")
# 5. CPU vs GPU Performance
if self.benchmark_df is not None:
cpu_col = "CPU_TextsPerSec"
gpu_col = "CUDA_TextsPerSec"
if cpu_col in self.benchmark_df.columns and gpu_col in self.benchmark_df.columns:
x = np.arange(len(self.benchmark_df))
width = 0.35
axes[1, 1].bar(x - width / 2, self.benchmark_df[cpu_col], width, label="CPU", alpha=0.7)
axes[1, 1].bar(x + width / 2, self.benchmark_df[gpu_col], width, label="GPU", alpha=0.7)
axes[1, 1].set_title("CPU vs GPU Performance")
axes[1, 1].set_ylabel("Texts/Second")
axes[1, 1].set_xticks(x)
axes[1, 1].set_xticklabels(self.benchmark_df["Model"], rotation=45, ha="right")
axes[1, 1].legend()
# 6. Parameter Efficiency
if (
self.benchmark_df is not None
and "Parameters_M" in self.benchmark_df.columns
and "Throughput_TextsPerSec" in self.benchmark_df.columns
):
# Efficiency = Throughput / Parameters (higher is better)
efficiency = self.benchmark_df["Throughput_TextsPerSec"] / (self.benchmark_df["Parameters_M"] + 1e-6)
axes[1, 2].barh(self.benchmark_df["Model"], efficiency)
axes[1, 2].set_title("Parameter Efficiency")
axes[1, 2].set_xlabel("Texts/Sec per Million Parameters")
plt.tight_layout()
output_path = save_path or str(self.images_dir / "benchmark_performance.png")
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path
def plot_batch_size_scaling(self, save_path: str | None = None) -> str:
"""Create batch size scaling analysis plot."""
if not self.benchmark_results:
return ""
plt.figure(figsize=(12, 8))
for result in self.benchmark_results:
model_name = result.get("model_name", "Unknown")
speed_benchmarks = result.get("speed_benchmarks", {})
# Extract batch size performance for medium texts
if "medium" in speed_benchmarks:
batch_sizes = []
throughputs = []
for batch_key, metrics in speed_benchmarks["medium"].items():
if batch_key.startswith("batch_"):
batch_size = int(batch_key.split("_")[1])
throughput = metrics.get("texts_per_second", 0)
batch_sizes.append(batch_size)
throughputs.append(throughput)
if batch_sizes:
plt.plot(batch_sizes, throughputs, marker="o", label=model_name, linewidth=2)
plt.xlabel("Batch Size", fontsize=12)
plt.ylabel("Throughput (Texts/Second)", fontsize=12)
plt.title("Batch Size Scaling Performance", fontsize=16, fontweight="bold")
plt.legend()
plt.grid(visible=True, alpha=0.3)
plt.xscale("log", base=2)
output_path = save_path or str(self.images_dir / "batch_size_scaling.png")
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path
def plot_memory_scaling(self, save_path: str | None = None) -> str:
"""Create memory scaling analysis plot."""
if not self.benchmark_results:
return ""
plt.figure(figsize=(12, 8))
for result in self.benchmark_results:
model_name = result.get("model_name", "Unknown")
memory_benchmarks = result.get("memory_benchmarks", {})
batch_sizes = []
memory_usage = []
for batch_key, metrics in memory_benchmarks.items():
if batch_key.startswith("batch_") and not metrics.get("oom", False) and "error" not in metrics:
batch_size = int(batch_key.split("_")[1])
memory_mb = metrics.get("memory_used_mb", 0)
batch_sizes.append(batch_size)
memory_usage.append(memory_mb)
if batch_sizes:
plt.plot(batch_sizes, memory_usage, marker="s", label=model_name, linewidth=2)
plt.xlabel("Batch Size", fontsize=12)
plt.ylabel("Memory Usage (MB)", fontsize=12)
plt.title("Memory Scaling by Batch Size", fontsize=16, fontweight="bold")
plt.legend()
plt.grid(visible=True, alpha=0.3)
plt.xscale("log", base=2)
output_path = save_path or str(self.images_dir / "memory_scaling.png")
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path
def create_peer_comparison_chart(self, model_name: str) -> str:
"""Create comparison chart using actual evaluation results."""
if self.comparison_df is None or self.comparison_df.empty:
logger.warning("No comparison data available for peer comparison chart")
return ""
# Use actual evaluation results instead of hardcoded scores
df_sorted = self.comparison_df.sort_values("NDCG@10", ascending=True)
plt.figure(figsize=(12, 8))
# Color models differently - highlight the user's model
colors = []
for model in df_sorted["Model"]:
if model_name.lower() in model.lower() or "gte_qwen2_m2v_code" in model.lower():
colors.append("red") # User's model
else:
colors.append("skyblue") # Peer models
bars = plt.barh(df_sorted["Model"], df_sorted["NDCG@10"], color=colors)
# Highlight current model with special formatting
for i, model in enumerate(df_sorted["Model"]):
if model_name.lower() in model.lower() or "gte_qwen2_m2v_code" in model.lower():
bars[i].set_alpha(0.8)
bars[i].set_edgecolor("black")
bars[i].set_linewidth(2)
plt.xlabel("NDCG@10 Score", fontsize=12)
plt.title(
"CodeSearchNet Model Comparison (Actual Results)",
fontsize=16,
fontweight="bold",
)
plt.grid(axis="x", alpha=0.3)
# Add score labels
for i, score in enumerate(df_sorted["NDCG@10"]):
plt.text(score + 0.005, i, f"{score:.3f}", va="center")
plt.tight_layout()
output_path = self.images_dir / "peer_comparison.png"
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return str(output_path)
def create_efficiency_analysis(self, model_name: str) -> str:
"""Create efficiency analysis chart using actual evaluation results."""
if self.comparison_df is None or self.comparison_df.empty:
logger.warning("No comparison data available for efficiency analysis")
return ""
models = []
scores = []
params = []
is_user_model = []
# Process all evaluated models
for _, row in self.comparison_df.iterrows():
model_display_name = row["Model"]
current_model_score = row["NDCG@10"]
# Determine if this is the user's model
is_users = (
model_name.lower() in model_display_name.lower() or "gte_qwen2_m2v_code" in model_display_name.lower()
)
if is_users:
# User's distilled model
models.append(model_display_name)
# Safe conversion to float for pandas values
score_value = pd.to_numeric(current_model_score, errors="coerce")
scores.append(float(score_value) if not pd.isna(score_value) else 0.0)
# Safe conversion for DISTILLED_MODEL_SPECS parameters
param_value = DISTILLED_MODEL_SPECS.get("parameters", 39)
params.append(float(param_value) if isinstance(param_value, (int, float)) else 39.0)
is_user_model.append(True)
else:
# Find corresponding peer model specs
model_key = None
for peer_key in MODEL_SPECS:
peer_short_name = peer_key.split("/")[-1].lower()
if peer_short_name in model_display_name.lower():
model_key = peer_key
break
if model_key and model_key in MODEL_SPECS:
models.append(model_display_name.split("/")[-1]) # Short name
# Safe conversion to float for pandas values
score_value = pd.to_numeric(current_model_score, errors="coerce")
scores.append(float(score_value) if not pd.isna(score_value) else 0.0)
param_value = MODEL_SPECS[model_key].get("parameters", 100.0)
params.append(float(param_value) if isinstance(param_value, (int, float)) else 100.0)
is_user_model.append(False)
if not models:
logger.warning("No models with parameter specifications found")
return ""
plt.figure(figsize=(12, 8))
# Plot peer models
peer_models = [m for i, m in enumerate(models) if not is_user_model[i]]
peer_params = [p for i, p in enumerate(params) if not is_user_model[i]]
peer_scores = [s for i, s in enumerate(scores) if not is_user_model[i]]
if peer_models:
plt.scatter(
peer_params,
peer_scores,
s=100,
alpha=0.6,
label="Peer Models",
color="skyblue",
)
# Plot user's model
user_models = [m for i, m in enumerate(models) if is_user_model[i]]
user_params = [p for i, p in enumerate(params) if is_user_model[i]]
user_scores = [s for i, s in enumerate(scores) if is_user_model[i]]
if user_models:
plt.scatter(
user_params,
user_scores,
s=200,
color="red",
alpha=0.8,
label=f"{user_models[0]} (Distilled)",
marker="*",
)
# Add model labels
for i, (model, param, score) in enumerate(zip(models, params, scores, strict=False)):
if is_user_model[i]:
plt.annotate(
model,
(param, score),
xytext=(10, 10),
textcoords="offset points",
fontweight="bold",
color="red",
)
else:
plt.annotate(
model,
(param, score),
xytext=(5, 5),
textcoords="offset points",
fontsize=9,
)
plt.xlabel("Model Size (Million Parameters)", fontsize=12)
plt.ylabel("NDCG@10 Score", fontsize=12)
plt.title(
"Model Efficiency: Performance vs Size (Actual Results)",
fontsize=16,
fontweight="bold",
)
plt.legend()
plt.grid(visible=True, alpha=0.3)
plt.xscale("log")
plt.tight_layout()
output_path = self.images_dir / "efficiency_analysis.png"
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return str(output_path)
def plot_model_specifications(self, save_path: str | None = None) -> str:
"""Create visualization of our model specifications."""
if not self.model_specs:
logger.warning("No model specifications available for plotting")
return ""
# Filter only successfully analyzed models
successful_specs = {k: v for k, v in self.model_specs.items() if v.get("analysis_successful", False)}
if not successful_specs:
logger.warning("No successfully analyzed models for plotting")
return ""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle("Our Distilled Models - Specifications Analysis", fontsize=16, fontweight="bold")
# Extract data
model_names = list(successful_specs.keys())
# Shorten model names for better display
display_names = [name.replace("code_model2vec_", "").replace("_", " ") for name in model_names]
vocab_sizes = [spec["vocabulary_size"] for spec in successful_specs.values()]
param_counts = [spec["parameters_millions"] for spec in successful_specs.values()]
embed_dims = [spec["embedding_dimensions"] for spec in successful_specs.values()]
disk_sizes = [spec["disk_size_mb"] for spec in successful_specs.values()]
# 1. Vocabulary Size Comparison
axes[0, 0].barh(display_names, vocab_sizes, color="skyblue")
axes[0, 0].set_title("Vocabulary Size")
axes[0, 0].set_xlabel("Number of Tokens")
for i, v in enumerate(vocab_sizes):
axes[0, 0].text(v + max(vocab_sizes) * 0.01, i, f"{v:,}", va="center", fontsize=9)
# 2. Parameter Count Comparison
axes[0, 1].barh(display_names, param_counts, color="lightgreen")
axes[0, 1].set_title("Model Parameters")
axes[0, 1].set_xlabel("Parameters (Millions)")
for i, v in enumerate(param_counts):
axes[0, 1].text(v + max(param_counts) * 0.01, i, f"{v:.1f}M", va="center", fontsize=9)
# 3. Embedding Dimensions
axes[1, 0].barh(display_names, embed_dims, color="lightsalmon")
axes[1, 0].set_title("Embedding Dimensions")
axes[1, 0].set_xlabel("Dimensions")
for i, v in enumerate(embed_dims):
axes[1, 0].text(v + max(embed_dims) * 0.01, i, f"{v}", va="center", fontsize=9)
# 4. Disk Size
axes[1, 1].barh(display_names, disk_sizes, color="plum")
axes[1, 1].set_title("Model Size on Disk")
axes[1, 1].set_xlabel("Size (MB)")
for i, v in enumerate(disk_sizes):
axes[1, 1].text(v + max(disk_sizes) * 0.01, i, f"{v:.1f}MB", va="center", fontsize=9)
plt.tight_layout()
output_path = save_path or str(self.images_dir / "model_specifications.png")
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
return output_path
def generate_comprehensive_report(self, model_name: str = "Simplified Distillation Models") -> str:
"""Generate comprehensive markdown report for all evaluated models."""
if not self.results:
logger.error("No results to analyze")
return ""
# Find all simplified distillation models
simplified_models = []
peer_models = []
for result in self.results:
result_model_name = result["model_name"]
if (
"code_model2vec" in result_model_name.lower()
or "distilled" in result_model_name.lower()
or "(ours)" in result_model_name.lower()
):
simplified_models.append(result)
else:
peer_models.append(result)
# Get the best performing simplified model for main analysis
if simplified_models:
main_result = max(simplified_models, key=lambda x: x.get("overall", {}).get("ndcg@10", 0))
main_model_name = main_result["model_name"]
else:
# Fallback to first result if no simplified models found
main_result = self.results[0]
main_model_name = main_result["model_name"]
overall = main_result.get("overall", {})
languages = main_result.get("languages", {})
# Calculate language scores for radar chart
language_scores = {}
for lang, lang_data in languages.items():
metrics = lang_data.get("metrics", {})
language_scores[lang.title()] = metrics.get("ndcg@10", 0)
# Create visualizations
logger.info("Generating visualizations...")
output_dir, images_dir, reports_dir = setup_directories()
self.create_performance_radar_chart(main_model_name, language_scores)
comparison_chart = self.plot_model_comparison()
heatmap_chart = self.plot_language_heatmap()
peer_chart = self.create_peer_comparison_chart(main_model_name)
efficiency_chart = self.create_efficiency_analysis(main_model_name)
model_specs_chart = self.plot_model_specifications()
# Generate individual radar charts for all simplified models
individual_radar_charts = self.create_individual_radar_charts(simplified_models)
# Create comparative radar chart (best distilled vs top peer models)
comparative_radar_chart = self.create_comparative_radar_chart(simplified_models, peer_models)
# Create benchmark visualizations
benchmark_chart = ""
batch_scaling_chart = ""
memory_scaling_chart = ""
if self.benchmark_results:
benchmark_chart = self.plot_benchmark_performance()
batch_scaling_chart = self.plot_batch_size_scaling()
memory_scaling_chart = self.plot_memory_scaling()
# Generate report
report = f"""# Code-Specialized Model2Vec Distillation Analysis
## 🎯 Executive Summary
This report presents a comprehensive analysis of Model2Vec distillation experiments using different teacher models for code-specialized embedding generation.
### Evaluated Models Overview
**Simplified Distillation Models:** {len(simplified_models)}
**Peer Comparison Models:** {len(peer_models)}
**Total Models Analyzed:** {len(self.results)}
### Best Performing Simplified Model: {main_model_name}
**Overall CodeSearchNet Performance:**
- **NDCG@10**: {overall.get("ndcg@10", 0):.4f}
- **Mean Reciprocal Rank (MRR)**: {overall.get("mrr", 0):.4f}
- **Recall@5**: {overall.get("recall@5", 0):.4f}
- **Mean Rank**: {overall.get("mean_rank", 0):.1f}
## πŸ“Š Comprehensive Model Comparison
### All Simplified Distillation Models Performance
"""
# Add table of all simplified models
if simplified_models:
report += "| Model | Teacher | NDCG@10 | MRR | Recall@5 | Status |\n"
report += "|-------|---------|---------|-----|----------|--------|\n"
# Sort by performance
simplified_models_sorted = sorted(
simplified_models, key=lambda x: x.get("overall", {}).get("ndcg@10", 0), reverse=True
)
for rank, result in enumerate(simplified_models_sorted, 1):
model_display = result["model_name"]
overall_metrics = result.get("overall", {})
# Extract teacher model name from model name
teacher_name, teacher_link = get_teacher_model_info(model_display)
status = "πŸ₯‡ Best" if rank == 1 else "πŸ₯ˆ 2nd" if rank == 2 else "πŸ₯‰ 3rd" if rank == 3 else f"#{rank}"
# Use linked teacher name if available
teacher_display = f"[{teacher_name}]({teacher_link})" if teacher_link else teacher_name
report += f"| {model_display} | {teacher_display} | {overall_metrics.get('ndcg@10', 0):.4f} | {overall_metrics.get('mrr', 0):.4f} | {overall_metrics.get('recall@5', 0):.4f} | {status} |\n"
# Add model specifications section
if self.model_specs:
successful_specs = {k: v for k, v in self.model_specs.items() if v.get("analysis_successful", False)}
if successful_specs:
report += """
### πŸ“Š Model Specifications Analysis
Our distilled models exhibit consistent architectural characteristics across different teacher models:
| Model | Vocabulary Size | Parameters | Embedding Dim | Disk Size |
|-------|----------------|------------|---------------|-----------|
"""
# Sort models by performance for consistency
for result in simplified_models_sorted:
model_display = result["model_name"]
if model_display in successful_specs:
spec = successful_specs[model_display]
vocab_size = spec["vocabulary_size"]
params_m = spec["parameters_millions"]
embed_dim = spec["embedding_dimensions"]
disk_size = spec["disk_size_mb"]
report += f"| {model_display.replace('code_model2vec_', '')} | {vocab_size:,} | {params_m:.1f}M | {embed_dim} | {disk_size:.1f}MB |\n"
if model_specs_chart:
report += f"""
![Model Specifications]({model_specs_chart})
*Comprehensive analysis of our distilled models showing vocabulary size, parameter count, embedding dimensions, and storage requirements.*
#### Key Insights from Model Specifications:
"""
# Calculate some insights
vocab_sizes = [spec["vocabulary_size"] for spec in successful_specs.values()]
param_counts = [spec["parameters_millions"] for spec in successful_specs.values()]
embed_dims = [spec["embedding_dimensions"] for spec in successful_specs.values()]
disk_sizes = [spec["disk_size_mb"] for spec in successful_specs.values()]
if vocab_sizes:
avg_vocab = sum(vocab_sizes) / len(vocab_sizes)
avg_params = sum(param_counts) / len(param_counts)
avg_disk = sum(disk_sizes) / len(disk_sizes)
report += f"""
- **Vocabulary Consistency**: All models use vocabulary sizes ranging from {min(vocab_sizes):,} to {max(vocab_sizes):,} tokens (avg: {avg_vocab:,.0f})
- **Parameter Efficiency**: Models range from {min(param_counts):.1f}M to {max(param_counts):.1f}M parameters (avg: {avg_params:.1f}M)
- **Storage Efficiency**: Disk usage ranges from {min(disk_sizes):.1f}MB to {max(disk_sizes):.1f}MB (avg: {avg_disk:.1f}MB)
- **Embedding Dimensions**: Consistent {embed_dims[0]} dimensions across all models (optimized for efficiency)
"""
report += """
### Key Findings
"""
if simplified_models and len(simplified_models) > 1:
best_model = simplified_models_sorted[0]
worst_model = simplified_models_sorted[-1]
best_score = best_model.get("overall", {}).get("ndcg@10", 0)
worst_score = worst_model.get("overall", {}).get("ndcg@10", 0)
report += f"""
- **Best Teacher Model**: {best_model["model_name"]} (NDCG@10: {best_score:.4f})
- **Least Effective Teacher**: {worst_model["model_name"]} (NDCG@10: {worst_score:.4f})
- **Performance Range**: {((best_score - worst_score) / best_score * 100):.1f}% difference between best and worst
- **Average Performance**: {sum(r.get("overall", {}).get("ndcg@10", 0) for r in simplified_models) / len(simplified_models):.4f} NDCG@10
"""
# Add radar charts section
report += """
## 🎯 Language Performance Radar Charts
### Best Model vs Peer Models Comparison
"""
if comparative_radar_chart:
report += f"![Comparative Radar Chart]({comparative_radar_chart})\n\n"
report += "*Comparative view showing how the best simplified distillation model performs against top peer models across programming languages.*\n\n"
# Add individual radar charts for all simplified models (sorted by performance)
if individual_radar_charts:
report += "### Individual Model Performance by Language\n\n"
# Sort the radar charts by model performance (best to worst)
for result in simplified_models_sorted:
chart_model_name = result["model_name"]
if chart_model_name in individual_radar_charts:
chart_path = individual_radar_charts[chart_model_name]
# Extract teacher name for cleaner display
teacher_name, teacher_link = get_teacher_model_info(chart_model_name)
# Use linked teacher name if available
teacher_display = f"[{teacher_name}]({teacher_link})" if teacher_link else teacher_name
# Get performance for display
overall_metrics = result.get("overall", {})
ndcg_score = overall_metrics.get("ndcg@10", 0)
report += f"#### {chart_model_name} (Teacher: {teacher_display}) - NDCG@10: {ndcg_score:.4f}\n\n"
report += f"![{chart_model_name} Radar Chart]({chart_path})\n\n"
report += f"""
## πŸ† Peer Model Comparison
![Peer Comparison]({peer_chart})
*Comparison with established code-specialized embedding models using actual evaluation results.*
### Complete Model Ranking
"""
# Add comprehensive ranking table
if self.comparison_df is not None and len(self.comparison_df) > 0:
report += "| Rank | Model | Type | NDCG@10 | MRR | Recall@5 |\n"
report += "|------|-------|------|---------|-----|----------|\n"
for rank in range(len(self.comparison_df)):
row_data = self.comparison_df.iloc[rank]
model_name_display = str(row_data["Model"])
# Determine model type
if (
"code_model2vec" in model_name_display.lower()
or "distilled" in model_name_display.lower()
or "(ours)" in model_name_display.lower()
):
# Check if it's a fine-tuned model
if "fine_tuned" in model_name_display.lower():
model_type = "**πŸŽ“ Fine-tuned Distillation**"
else:
model_type = "**πŸ”₯ Simplified Distillation**"
elif any(code_term in model_name_display.lower() for code_term in ["codebert", "graphcode", "codet5"]):
model_type = "Code-Specific"
elif "potion" in model_name_display.lower():
model_type = "Model2Vec"
else:
model_type = "General"
report += f"| {rank + 1} | {model_name_display} | {model_type} | {row_data['NDCG@10']:.4f} | {row_data['MRR']:.4f} | {row_data['Recall@5']:.4f} |\n"
report += f"""
## πŸ“ˆ Performance Analysis
### Multi-Model Comparison Charts
![Model Comparison]({comparison_chart})
*Comprehensive comparison across all evaluation metrics.*
### Language Performance Analysis
![Language Heatmap]({heatmap_chart})
*Performance heatmap showing how different models perform across programming languages.*
### Efficiency Analysis
![Efficiency Analysis]({efficiency_chart})
*Performance vs model size analysis showing the efficiency benefits of distillation.*
"""
# Add benchmark analysis if available
if self.benchmark_results:
report += f"""
## ⚑ Operational Performance Analysis
![Benchmark Performance]({benchmark_chart})
*Comprehensive performance benchmarking across multiple operational metrics.*
### Performance Scaling Analysis
![Batch Size Scaling]({batch_scaling_chart})
*How performance scales with different batch sizes for optimal throughput.*
![Memory Scaling]({memory_scaling_chart})
*Memory usage patterns across different batch sizes.*
"""
# Add detailed language analysis
report += """
## πŸ” Language-Specific Analysis
### Performance by Programming Language
"""
if language_scores:
report += "| Language | Best Model Performance | Average Performance | Language Difficulty |\n"
report += "|----------|------------------------|--------------------|--------------------|\n"
for lang in sorted(language_scores.keys()):
# Find best performance for this language across all models
lang_performances = []
for result in self.results:
lang_data = result.get("languages", {}).get(lang.lower(), {})
if lang_data:
lang_performances.append(lang_data.get("metrics", {}).get("ndcg@10", 0))
if lang_performances:
best_lang_perf = max(lang_performances)
avg_lang_perf = sum(lang_performances) / len(lang_performances)
difficulty = "Easy" if avg_lang_perf > 0.3 else "Medium" if avg_lang_perf > 0.2 else "Hard"
report += f"| {lang} | {best_lang_perf:.4f} | {avg_lang_perf:.4f} | {difficulty} |\n"
report += """
## 🎯 Conclusions and Recommendations
### Teacher Model Analysis
Based on the evaluation results across all simplified distillation models:
"""
if simplified_models and len(simplified_models) > 1:
# Analyze which teacher models work best
teacher_performance = {}
for result in simplified_models:
model_name = result["model_name"]
score = result.get("overall", {}).get("ndcg@10", 0)
teacher_name, teacher_link = get_teacher_model_info(model_name)
teacher_performance[teacher_name] = score
if teacher_performance:
best_teacher = max(teacher_performance.items(), key=lambda x: x[1])
worst_teacher = min(teacher_performance.items(), key=lambda x: x[1])
report += f"""
1. **Best Teacher Model**: {best_teacher[0]} (NDCG@10: {best_teacher[1]:.4f})
2. **Least Effective Teacher**: {worst_teacher[0]} (NDCG@10: {worst_teacher[1]:.4f})
3. **Teacher Model Impact**: Choice of teacher model affects performance by {((best_teacher[1] - worst_teacher[1]) / best_teacher[1] * 100):.1f}%
### Recommendations
- **For Production**: Use {best_teacher[0]} as teacher model for best performance
- **For Efficiency**: Model2Vec distillation provides significant size reduction with competitive performance
- **For Code Tasks**: Specialized models consistently outperform general-purpose models
"""
report += f"""
## πŸ“„ Methodology
### Evaluation Protocol
- **Dataset**: CodeSearchNet test sets for 6 programming languages
- **Metrics**: NDCG@k, MRR, Recall@k following CodeSearchNet methodology
- **Query Format**: Natural language documentation strings
- **Corpus Format**: Function code strings
- **Evaluation**: Retrieval of correct code for each documentation query
### Teacher Models Tested
- [sentence-transformers/all-MiniLM-L6-v2](https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2) (proven baseline)
- [sentence-transformers/all-mpnet-base-v2](https://huggingface.co/sentence-transformers/all-mpnet-base-v2) (general purpose)
- [sentence-transformers/paraphrase-MiniLM-L6-v2](https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L6-v2) (paraphrase model)
- [microsoft/codebert-base](https://huggingface.co/microsoft/codebert-base) (code-specialized)
- [microsoft/graphcodebert-base](https://huggingface.co/microsoft/graphcodebert-base) (graph-aware code model)
- [Alibaba-NLP/gte-Qwen2-1.5B-instruct](https://huggingface.co/Alibaba-NLP/gte-Qwen2-1.5B-instruct) (instruction model)
- [BAAI/bge-m3](https://huggingface.co/BAAI/bge-m3) (multilingual model)
- [jinaai/jina-embeddings-v3](https://huggingface.co/jinaai/jina-embeddings-v3) (modern embedding model)
- [nomic-ai/nomic-embed-text-v2-moe](https://huggingface.co/nomic-ai/nomic-embed-text-v2-moe) (mixture of experts)
- [Qodo/Qodo-Embed-1-1.5B](https://huggingface.co/Qodo/Qodo-Embed-1-1.5B) (code-specialized)
- [lightonai/Reason-ModernColBERT](https://huggingface.co/lightonai/Reason-ModernColBERT) (ColBERT architecture)
- [Linq-AI-Research/Linq-Embed-Mistral](https://huggingface.co/Linq-AI-Research/Linq-Embed-Mistral) (Mistral-based)
- [BAAI/bge-code-v1](https://huggingface.co/BAAI/bge-code-v1) (code-specialized BGE)
- [Salesforce/SFR-Embedding-Code-2B_R](https://huggingface.co/Salesforce/SFR-Embedding-Code-2B_R) (large code model)
### Distillation Method
- **Technique**: Model2Vec static embedding generation
- **Parameters**: PCA dims=256, SIF coefficient=1e-3, Zipf weighting=True
- **Training Data**: CodeSearchNet comment-code pairs
- **Languages**: Python, JavaScript, Java, PHP, Ruby, Go
---
*Report generated on {time.strftime("%Y-%m-%d %H:%M:%S")} using automated analysis pipeline.*
*For questions about methodology or results, please refer to the CodeSearchNet documentation.*
"""
return report
def export_results(self, output_file: str) -> None:
"""Export results to CSV format."""
if self.comparison_df is not None:
self.comparison_df.to_csv(output_file, index=False)
logger.info(f"Results exported to {output_file}")
def main(
results_dir: str = DEFAULT_EVALUATION_DIR,
model_name: str = "code_model2vec_distilled_models",
output: str = "REPORT.md",
export_csv: str | None = None,
) -> None:
"""Main analysis function."""
logger.info("Starting CodeSearchNet Analysis with Integrated Benchmarks")
logger.info("=" * 60)
# Setup output directories
output_dir, images_dir, reports_dir = setup_directories()
# Initialize analyzer with results directory (benchmarks are integrated)
analyzer = CodeSearchNetAnalyzer(
results_dir=results_dir,
benchmark_dir=None, # No longer needed - benchmarks are in comprehensive files
images_dir=images_dir,
)
# Load results (this will also load benchmark data from comprehensive files)
analyzer.load_results()
if not analyzer.results:
logger.error("No evaluation results found! Please run evaluation first.")
return
# Print summary (includes both evaluation and benchmark summaries)
analyzer.print_summary()
analyzer.analyze_language_performance()
# Analyze benchmark performance if available
if analyzer.benchmark_results:
analyzer.analyze_benchmark_performance()
else:
logger.warning("No benchmark results found. Models may have been evaluated with --skip-benchmark flag.")
# Generate comprehensive report with benchmark integration
logger.info("Generating comprehensive report with integrated benchmark data...")
report = analyzer.generate_comprehensive_report(model_name)
# Save report
report_path = Path(output)
with report_path.open("w") as f:
f.write(report)
# Export CSV if requested
if export_csv:
analyzer.export_results(export_csv)
# Export benchmark CSV if available
if analyzer.benchmark_df is not None and not analyzer.benchmark_df.empty:
benchmark_csv = report_path.parent / f"{model_name}_benchmark_comparison.csv"
analyzer.benchmark_df.to_csv(benchmark_csv, index=False)
logger.info(f"πŸ“Š Benchmark comparison saved to: {benchmark_csv}")
logger.info("βœ… CodeSearchNet analysis with integrated benchmarks complete!")
logger.info(f"πŸ“Š Report saved to: {report_path}")
logger.info(f"πŸ–ΌοΈ Charts saved to: {images_dir}")
logger.info(f"πŸ’Ύ Source: Comprehensive evaluation files in {results_dir}")
if __name__ == "__main__":
main()