""" Comprehensive CodeSearchNet Analysis and Reporting Script. This script provides a complete CodeSearchNet evaluation pipeline that includes: 1. Model evaluation results analysis 2. Peer model comparison analysis 3. Advanced visualizations and charts 4. Leaderboard comparison and ranking analysis 5. Comprehensive README report generation 6. Performance efficiency analysis 7. Language-specific performance analysis Features: - CodeSearchNet-style scoring (NDCG@10, MRR, Recall metrics) - Comparison with peer code-specialized models - Model efficiency metrics (performance per parameter) - Interactive visualizations with Plotly and Matplotlib - Professional charts for README integration - Statistical analysis of results across programming languages Usage: python analyze.py --results-dir results/ --model-name my_model distiller analyze --results-dir evaluation_results """ import json import logging import time from pathlib import Path from typing import Any import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as sns from .config import directories # Optional Plotly import with fallback PLOTLY_AVAILABLE = True try: import plotly.graph_objects as go except ImportError: PLOTLY_AVAILABLE = False # Set plotting style try: plt.style.use("seaborn-v0_8") except OSError: plt.style.use("seaborn") # Fallback for older matplotlib versions sns.set_palette("husl") # ============================================================================= # CONFIGURATION # ============================================================================= # Constants MIN_SCORES_FOR_STATS = 2 HIGH_PERFORMANCE_THRESHOLD = 0.3 MEDIUM_PERFORMANCE_THRESHOLD = 0.2 # Model Configuration MODEL_NAME = "code_model2vec_analysis" # Generic name for multi-model analysis ORIGINAL_MODEL_NAME = "Alibaba-NLP/gte-Qwen2-7B-instruct" OUTPUT_DIR = Path("analysis_results") IMAGES_DIR = Path("analysis_charts") REPORT_FILE = Path("REPORT.md") # Changed from README.md # Local directories for results - using standardized directories from config DEFAULT_EVALUATION_DIR = directories.evaluation_results DEFAULT_BENCHMARK_DIR = directories.benchmark_results # CodeSearchNet Languages CODE_LANGUAGES = ["python", "javascript", "java", "php", "ruby", "go"] # Model name mapping from the default models in evaluate.py and benchmark.py MODEL_NAME_MAPPING = { # File names to display names and HuggingFace links "all-MiniLM-L6-v2": { "name": "sentence-transformers/all-MiniLM-L6-v2", "link": "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2", }, "all-mpnet-base-v2": { "name": "sentence-transformers/all-mpnet-base-v2", "link": "https://huggingface.co/sentence-transformers/all-mpnet-base-v2", }, "paraphrase-MiniLM-L6-v2": { "name": "sentence-transformers/paraphrase-MiniLM-L6-v2", "link": "https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L6-v2", }, "codebert-base": {"name": "microsoft/codebert-base", "link": "https://huggingface.co/microsoft/codebert-base"}, "graphcodebert-base": { "name": "microsoft/graphcodebert-base", "link": "https://huggingface.co/microsoft/graphcodebert-base", }, "CodeBERTa-small-v1": { "name": "huggingface/CodeBERTa-small-v1", "link": "https://huggingface.co/huggingface/CodeBERTa-small-v1", }, "all-MiniLM-L12-v2": { "name": "sentence-transformers/all-MiniLM-L12-v2", "link": "https://huggingface.co/sentence-transformers/all-MiniLM-L12-v2", }, "potion-base-8M": {"name": "minishlab/potion-base-8M", "link": "https://huggingface.co/minishlab/potion-base-8M"}, "potion-retrieval-32M": { "name": "minishlab/potion-retrieval-32M", "link": "https://huggingface.co/minishlab/potion-retrieval-32M", }, "codet5-base": {"name": "Salesforce/codet5-base", "link": "https://huggingface.co/Salesforce/codet5-base"}, "gte-Qwen2-1.5B-instruct": { "name": "Alibaba-NLP/gte-Qwen2-1.5B-instruct", "link": "https://huggingface.co/Alibaba-NLP/gte-Qwen2-1.5B-instruct", }, "bge-m3": {"name": "BAAI/bge-m3", "link": "https://huggingface.co/BAAI/bge-m3"}, "jina-embeddings-v3": { "name": "jinaai/jina-embeddings-v3", "link": "https://huggingface.co/jinaai/jina-embeddings-v3", }, "nomic-embed-text-v2-moe": { "name": "nomic-ai/nomic-embed-text-v2-moe", "link": "https://huggingface.co/nomic-ai/nomic-embed-text-v2-moe", }, "Qodo-Embed-1-1.5B": {"name": "Qodo/Qodo-Embed-1-1.5B", "link": "https://huggingface.co/Qodo/Qodo-Embed-1-1.5B"}, "Reason-ModernColBERT": { "name": "lightonai/Reason-ModernColBERT", "link": "https://huggingface.co/lightonai/Reason-ModernColBERT", }, "Linq-Embed-Mistral": { "name": "Linq-AI-Research/Linq-Embed-Mistral", "link": "https://huggingface.co/Linq-AI-Research/Linq-Embed-Mistral", }, "bge-code-v1": {"name": "BAAI/bge-code-v1", "link": "https://huggingface.co/BAAI/bge-code-v1"}, "SFR-Embedding-Code-2B_R": { "name": "Salesforce/SFR-Embedding-Code-2B_R", "link": "https://huggingface.co/Salesforce/SFR-Embedding-Code-2B_R", }, } # Reverse mapping for lookups - using just the names DISPLAY_NAME_TO_FILE = {v["name"]: k for k, v in MODEL_NAME_MAPPING.items()} # Peer models for comparison (code-specialized models) PEER_MODELS = { "sentence-transformers/all-MiniLM-L6-v2": { "overall_ndcg": 0.25, "type": "General", "link": "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2", }, "microsoft/codebert-base": { "overall_ndcg": 0.32, "type": "Code-Specific", "link": "https://huggingface.co/microsoft/codebert-base", }, "microsoft/graphcodebert-base": { "overall_ndcg": 0.35, "type": "Code-Specific", "link": "https://huggingface.co/microsoft/graphcodebert-base", }, "huggingface/CodeBERTa-small-v1": { "overall_ndcg": 0.28, "type": "Code-Specific", "link": "https://huggingface.co/huggingface/CodeBERTa-small-v1", }, "sentence-transformers/all-mpnet-base-v2": { "overall_ndcg": 0.27, "type": "General", "link": "https://huggingface.co/sentence-transformers/all-mpnet-base-v2", }, } # Model specifications for efficiency analysis MODEL_SPECS = { "sentence-transformers/all-MiniLM-L6-v2": { "parameters": 22.7, "size_mb": 90, "link": "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2", }, "microsoft/codebert-base": { "parameters": 125.0, "size_mb": 500, "link": "https://huggingface.co/microsoft/codebert-base", }, "microsoft/graphcodebert-base": { "parameters": 125.0, "size_mb": 500, "link": "https://huggingface.co/microsoft/graphcodebert-base", }, "huggingface/CodeBERTa-small-v1": { "parameters": 84.0, "size_mb": 340, "link": "https://huggingface.co/huggingface/CodeBERTa-small-v1", }, "sentence-transformers/all-mpnet-base-v2": { "parameters": 109.0, "size_mb": 440, "link": "https://huggingface.co/sentence-transformers/all-mpnet-base-v2", }, "Alibaba-NLP/gte-Qwen2-1.5B-instruct": { "parameters": 1500.0, "size_mb": 3000, "link": "https://huggingface.co/Alibaba-NLP/gte-Qwen2-1.5B-instruct", }, } # Distilled model specifications DISTILLED_MODEL_SPECS = { "parameters": 39.0, # Model2Vec parameters "size_mb": 149.0, # Actual model size "dimensions": 256, # Model2Vec dimensions "original_dimensions": 3584, "distillation_method": "Model2Vec", "training_dataset": "CodeSearchNet", } # ============================================================================= # UTILITY FUNCTIONS # ============================================================================= logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) def setup_directories(base_path: Path | None = None) -> tuple[Path, Path, Path]: """Create necessary directories and return their paths.""" if base_path: output_dir = base_path / "analysis_results" images_dir = base_path / "analysis_results" / "charts" reports_dir = base_path / "analysis_results" / "reports" else: output_dir = Path() # Use current directory images_dir = IMAGES_DIR # Use analysis_charts reports_dir = Path() # Use current directory for reports # Only create directories that we actually use images_dir.mkdir(parents=True, exist_ok=True) return output_dir, images_dir, reports_dir def extract_model_name_from_filename(filename: str) -> str: """Extract and map model name from filename.""" # Remove prefixes and extensions name = filename.replace("codesearchnet_eval_", "").replace("benchmark_", "").replace(".json", "") # Check if it's in our mapping if name in MODEL_NAME_MAPPING: return MODEL_NAME_MAPPING[name]["name"] # Try to find partial matches for file_key, model_info in MODEL_NAME_MAPPING.items(): if file_key in name or name in file_key: return model_info["name"] # If no mapping found, return the cleaned name return name def get_model_link(model_name: str) -> str: """Get HuggingFace link for a model.""" # First try direct lookup by file key for model_info in MODEL_NAME_MAPPING.values(): if model_info["name"] == model_name: return model_info["link"] # Try partial matches for model_info in MODEL_NAME_MAPPING.values(): if model_name.lower() in model_info["name"].lower() or model_info["name"].lower() in model_name.lower(): return model_info["link"] # If no mapping found, construct link from model name if "/" in model_name: return f"https://huggingface.co/{model_name}" return "" def format_model_with_link(model_name: str) -> str: """Format model name with markdown link.""" link = get_model_link(model_name) if link: return f"[{model_name}]({link})" return model_name def get_teacher_model_info(model_display_name: str) -> tuple[str, str]: """Extract teacher model name and link from distilled model display name.""" # Mapping from model display patterns to teacher models teacher_mapping = { "all_MiniLM_L6_v2": ( "sentence-transformers/all-MiniLM-L6-v2", "https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2", ), "all_mpnet_base_v2": ( "sentence-transformers/all-mpnet-base-v2", "https://huggingface.co/sentence-transformers/all-mpnet-base-v2", ), "paraphrase_MiniLM_L6_v2": ( "sentence-transformers/paraphrase-MiniLM-L6-v2", "https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L6-v2", ), "codebert_base": ("microsoft/codebert-base", "https://huggingface.co/microsoft/codebert-base"), "graphcodebert_base": ("microsoft/graphcodebert-base", "https://huggingface.co/microsoft/graphcodebert-base"), "gte_Qwen2_1.5B_instruct": ( "Alibaba-NLP/gte-Qwen2-1.5B-instruct", "https://huggingface.co/Alibaba-NLP/gte-Qwen2-1.5B-instruct", ), "bge_m3": ("BAAI/bge-m3", "https://huggingface.co/BAAI/bge-m3"), "jina_embeddings_v2_base_code": ( "jina-embeddings-v2-base-code", "https://huggingface.co/jina-embeddings-v2-base-code", ), "jina_embeddings_v3": ("jinaai/jina-embeddings-v3", "https://huggingface.co/jinaai/jina-embeddings-v3"), "nomic_embed_text_v2_moe": ( "nomic-ai/nomic-embed-text-v2-moe", "https://huggingface.co/nomic-ai/nomic-embed-text-v2-moe", ), "Qodo_Embed_1_1.5B": ("Qodo/Qodo-Embed-1-1.5B", "https://huggingface.co/Qodo/Qodo-Embed-1-1.5B"), "Reason_ModernColBERT": ( "lightonai/Reason-ModernColBERT", "https://huggingface.co/lightonai/Reason-ModernColBERT", ), "Linq_Embed_Mistral": ( "Linq-AI-Research/Linq-Embed-Mistral", "https://huggingface.co/Linq-AI-Research/Linq-Embed-Mistral", ), "bge_code_v1": ("BAAI/bge-code-v1", "https://huggingface.co/BAAI/bge-code-v1"), "SFR_Embedding_Code_2B_R": ( "Salesforce/SFR-Embedding-Code-2B_R", "https://huggingface.co/Salesforce/SFR-Embedding-Code-2B_R", ), } for pattern, (teacher_name, teacher_link) in teacher_mapping.items(): if pattern in model_display_name: return teacher_name, teacher_link return "Unknown", "" class CodeSearchNetAnalyzer: """Analyzer for CodeSearchNet evaluation results and performance benchmarks.""" def __init__( self, results_dir: str | None = None, benchmark_dir: str | None = None, images_dir: Path | None = None, ) -> None: """Initialize analyzer with results directories.""" self.results_dir = Path(results_dir) if results_dir else Path(DEFAULT_EVALUATION_DIR) self.benchmark_dir = Path(benchmark_dir) if benchmark_dir else Path(DEFAULT_BENCHMARK_DIR) self.images_dir = images_dir or IMAGES_DIR self.results: list[dict[str, Any]] = [] self.benchmark_results: list[dict[str, Any]] = [] self.comparison_df: pd.DataFrame | None = None self.benchmark_df: pd.DataFrame | None = None self.model_specs: dict[str, dict[str, Any]] = {} # Store actual model specifications def load_benchmark_results(self) -> None: """Load benchmark results from comprehensive evaluation files.""" logger.info("📊 Loading benchmark results from comprehensive evaluations...") if not self.results_dir.exists(): logger.warning(f"Evaluation directory not found: {self.results_dir}") return logger.info(f"🔍 Searching for comprehensive evaluation files in: {self.results_dir}") # Look for both new comprehensive format and legacy formats comprehensive_files = list(self.results_dir.glob("comprehensive_eval_*.json")) legacy_files = list(self.results_dir.glob("codesearchnet_eval_*.json")) all_files = comprehensive_files + legacy_files logger.info( f"📁 Found {len(all_files)} evaluation files ({len(comprehensive_files)} comprehensive, {len(legacy_files)} legacy)" ) for eval_file_path in all_files: try: logger.info(f"📖 Loading: {eval_file_path.name}") with eval_file_path.open() as f: data = json.load(f) if data is not None: if not isinstance(data, dict): logger.warning(f"⚠️ Skipping {eval_file_path.name} (not a dict)") continue # Extract benchmark data if available benchmark_data = self._extract_benchmark_data(data, eval_file_path) if benchmark_data: self.benchmark_results.append(benchmark_data) logger.info(f"✅ Successfully loaded benchmark data: {benchmark_data['model_name']}") except (json.JSONDecodeError, KeyError) as e: logger.warning(f"❌ Failed to load {eval_file_path}: {e}") logger.info(f"📊 Total benchmark results loaded: {len(self.benchmark_results)}") if self.benchmark_results: model_names = [r.get("model_name", "Unknown") for r in self.benchmark_results] logger.info(f"🎯 Benchmark models found: {', '.join(model_names)}") self._create_benchmark_dataframe() def _extract_benchmark_data(self, data: dict, file_path: Path) -> dict[str, Any] | None: """Extract benchmark data from comprehensive evaluation results.""" # Check if this evaluation contains benchmark data if data.get("benchmark_skipped", False): return None # Check for benchmark fields if not any(key in data for key in ["size_metrics", "speed_benchmarks", "memory_benchmarks", "cpu_vs_gpu"]): return None # Extract model name original_name = data.get("model_name") or "Unknown" mapped_name = extract_model_name_from_filename( file_path.stem.replace("comprehensive_eval_", "").replace("codesearchnet_eval_", "") ) # Create benchmark result structure result: dict[str, Any] = { "model_name": mapped_name, "original_model_name": original_name, "size_metrics": data.get("size_metrics", {}), "speed_benchmarks": data.get("speed_benchmarks", {}), "memory_benchmarks": data.get("memory_benchmarks", {}), "cpu_vs_gpu": data.get("cpu_vs_gpu", {}), } return result def _create_benchmark_dataframe(self) -> None: """Create benchmark comparison DataFrame from results.""" if not self.benchmark_results: return benchmark_data = [] for result in self.benchmark_results: model_name = result.get("model_name", "Unknown") size_metrics = result.get("size_metrics", {}) speed_benchmarks = result.get("speed_benchmarks", {}) memory_benchmarks = result.get("memory_benchmarks", {}) cpu_vs_gpu = result.get("cpu_vs_gpu", {}) # Extract key metrics row = { "Model": model_name, "Disk_Size_MB": size_metrics.get("disk_size_mb", 0), "Parameters_M": size_metrics.get("parameters_millions", 0), "Embedding_Dim": size_metrics.get("embedding_dim", 0), "RAM_Usage_MB": size_metrics.get("ram_usage_mb", 0), "GPU_Memory_MB": size_metrics.get("gpu_memory_mb", 0), } # Speed metrics (medium texts, batch 32) if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]: batch_32 = speed_benchmarks["medium"]["batch_32"] row.update( { "Throughput_TextsPerSec": batch_32.get("texts_per_second", 0), "Latency_MsPerText": batch_32.get("time_per_text_ms", 0), "TokenSpeed_TokensPerSec": batch_32.get("tokens_per_second", 0), } ) # Memory scaling (batch 32) if "batch_32" in memory_benchmarks: batch_32_mem = memory_benchmarks["batch_32"] if not batch_32_mem.get("oom", False) and "error" not in batch_32_mem: row.update( { "Memory_Used_MB": batch_32_mem.get("memory_used_mb", 0), "Memory_Per_Text_MB": batch_32_mem.get("memory_per_text_mb", 0), } ) # CPU vs GPU comparison for device, metrics in cpu_vs_gpu.items(): if isinstance(metrics, dict) and "error" not in metrics: device_key = f"{device.upper()}_TextsPerSec" row[device_key] = metrics.get("texts_per_second", 0) benchmark_data.append(row) self.benchmark_df = pd.DataFrame(benchmark_data) def analyze_our_model_specifications(self) -> None: """Analyze actual model specifications for our distilled models.""" logger.info("🔍 Analyzing model specifications for our distilled models...") # Look for our models in the code_model2vec/final directory final_models_dir = Path("code_model2vec/final") if not final_models_dir.exists(): logger.warning(f"Final models directory not found: {final_models_dir}") return # Find all our model directories our_model_dirs = [ model_dir for model_dir in final_models_dir.iterdir() if model_dir.is_dir() and "code_model2vec" in model_dir.name ] logger.info(f"📁 Found {len(our_model_dirs)} distilled model directories") for model_dir in our_model_dirs: model_name = model_dir.name logger.info(f"📊 Analyzing model: {model_name}") try: # Try to load the model and get specifications from distiller.model2vec import StaticModel model = StaticModel.from_pretrained(str(model_dir)) # Get model specifications vocab_size = len(model.tokens) embedding_dim = model.dim total_params = vocab_size * embedding_dim # Get file size information model_file = model_dir / "model.safetensors" disk_size_mb: float = 0.0 if model_file.exists(): disk_size_mb = float(model_file.stat().st_size / (1024 * 1024)) # Convert to MB # Store specifications self.model_specs[model_name] = { "vocabulary_size": vocab_size, "embedding_dimensions": embedding_dim, "total_parameters": total_params, "parameters_millions": total_params / 1_000_000, "disk_size_mb": disk_size_mb, "model_path": str(model_dir), "analysis_successful": True, } logger.info( f"✅ {model_name}: {vocab_size:,} vocab, {embedding_dim} dims, {total_params:,} params ({total_params / 1_000_000:.1f}M)" ) except Exception as e: logger.warning(f"❌ Failed to analyze {model_name}: {e}") self.model_specs[model_name] = { "analysis_successful": False, "error": str(e), "model_path": str(model_dir), } logger.info( f"📊 Successfully analyzed {len([s for s in self.model_specs.values() if s.get('analysis_successful', False)])} models" ) def load_results(self) -> None: """Load evaluation results from local directory.""" logger.info("🔍 Loading evaluation results...") if not self.results_dir.exists(): logger.warning(f"Evaluation directory not found: {self.results_dir}") return logger.info(f"🔍 Searching for evaluation files in: {self.results_dir}") # Look for both new comprehensive format and legacy formats comprehensive_files = list(self.results_dir.glob("comprehensive_eval_*.json")) legacy_files = list(self.results_dir.glob("codesearchnet_eval_*.json")) all_files = comprehensive_files + legacy_files logger.info( f"📁 Found {len(all_files)} evaluation files ({len(comprehensive_files)} comprehensive, {len(legacy_files)} legacy)" ) for json_file in all_files: try: logger.info(f"📖 Loading: {json_file.name}") with json_file.open() as f: data = json.load(f) if data is not None: if not isinstance(data, dict): logger.warning(f"⚠️ Skipping {json_file.name} (not a dict)") continue # Normalize data format for analysis normalized_data = self._normalize_evaluation_data(data, json_file) self.results.append(normalized_data) logger.info(f"✅ Successfully loaded: {normalized_data['model_name']}") except (json.JSONDecodeError, KeyError) as e: logger.warning(f"❌ Failed to load {json_file}: {e}") logger.info(f"📊 Total loaded: {len(self.results)} model results") if self.results: model_names = [r.get("model_name", "Unknown") for r in self.results] logger.info(f"🎯 Models found: {', '.join(model_names)}") self._create_comparison_dataframe() # Also load benchmark results self.load_benchmark_results() # Analyze actual model specifications for our models self.analyze_our_model_specifications() def _normalize_evaluation_data(self, data: dict, file_path: Path) -> dict[str, Any]: """Normalize evaluation data to consistent format for analysis.""" # Extract model name original_name = data.get("model_name", "Unknown") file_stem = file_path.stem.replace("comprehensive_eval_", "").replace("codesearchnet_eval_", "") mapped_name = extract_model_name_from_filename(file_stem) # Handle comprehensive format (new) if "codesearch_overall" in data and "codesearch_languages" in data: result = { "model_name": mapped_name, "original_model_name": original_name, "overall": data.get("codesearch_overall", {}), "languages": data.get("codesearch_languages", {}), } # Handle legacy format (old codesearchnet_eval files) else: result = { "model_name": mapped_name, "original_model_name": original_name, "overall": data.get("overall", {}), "languages": data.get("languages", {}), } return result def _create_comparison_dataframe(self) -> None: """Create comparison DataFrame from results.""" if not self.results: return comparison_data = [] for result in self.results: overall = result.get("overall", {}) row = { "Model": result["model_name"], "MRR": overall.get("mrr", 0), "NDCG@1": overall.get("ndcg@1", 0), "NDCG@5": overall.get("ndcg@5", 0), "NDCG@10": overall.get("ndcg@10", 0), "Recall@1": overall.get("recall@1", 0), "Recall@5": overall.get("recall@5", 0), "Recall@10": overall.get("recall@10", 0), "Mean_Rank": overall.get("mean_rank", 0), "Median_Rank": overall.get("median_rank", 0), } comparison_data.append(row) self.comparison_df = pd.DataFrame(comparison_data) if not self.comparison_df.empty: self.comparison_df = self.comparison_df.sort_values("NDCG@10", ascending=False) def print_summary(self) -> None: """Print summary of results.""" if not self.results: logger.warning("No results to summarize") return print(f"\n{'=' * 60}") print("CodeSearchNet Evaluation Summary") print(f"{'=' * 60}") print(f"Total models evaluated: {len(self.results)}") if self.comparison_df is not None and not self.comparison_df.empty: print(f"\nTop performing model: {self.comparison_df.iloc[0]['Model']}") print(f"Best NDCG@10: {self.comparison_df.iloc[0]['NDCG@10']:.4f}") print(f"Best MRR: {self.comparison_df['MRR'].max():.4f}") print(f"\nEvaluated languages: {', '.join(CODE_LANGUAGES)}") # Also print benchmark summary if available if self.benchmark_results: print(f"\n{'=' * 60}") print("Performance Benchmark Summary") print(f"{'=' * 60}") print(f"Total models benchmarked: {len(self.benchmark_results)}") if self.benchmark_df is not None and not self.benchmark_df.empty: # Safely get fastest and smallest models fastest_model = "N/A" smallest_model = "N/A" if "Throughput_TextsPerSec" in self.benchmark_df.columns: fastest_idx = self.benchmark_df["Throughput_TextsPerSec"].idxmax() fastest_model = str(self.benchmark_df.loc[fastest_idx, "Model"]) if "Disk_Size_MB" in self.benchmark_df.columns: smallest_idx = self.benchmark_df["Disk_Size_MB"].idxmin() smallest_model = str(self.benchmark_df.loc[smallest_idx, "Model"]) print(f"\nFastest model: {fastest_model}") print(f"Smallest model: {smallest_model}") def analyze_language_performance(self) -> None: """Analyze performance across programming languages.""" if not self.results: return print(f"\n{'=' * 60}") print("Language-Specific Performance Analysis") print(f"{'=' * 60}") for result in self.results: model_name = result["model_name"] print(f"\nModel: {model_name}") print("-" * 40) languages = result.get("languages", {}) lang_data = [] for lang, lang_results in languages.items(): metrics = lang_results.get("metrics", {}) lang_data.append( { "Language": lang, "NDCG@10": metrics.get("ndcg@10", 0), "MRR": metrics.get("mrr", 0), "Recall@5": metrics.get("recall@5", 0), "Queries": lang_results.get("num_queries", 0), } ) if lang_data: lang_df = pd.DataFrame(lang_data) print(lang_df.to_string(index=False, float_format="%.4f")) print(f"\nBest language: {lang_df.loc[lang_df['NDCG@10'].idxmax(), 'Language']}") print(f"Average NDCG@10: {lang_df['NDCG@10'].mean():.4f}") print(f"Average queries per language: {lang_df['Queries'].mean():.0f}") def analyze_benchmark_performance(self) -> None: """Analyze and print benchmark performance summary.""" if not self.benchmark_results: logger.warning("No benchmark results to analyze") return print(f"\n{'=' * 60}") print("Performance Benchmark Analysis") print(f"{'=' * 60}") for result in self.benchmark_results: model_name = result.get("model_name", "Unknown") print(f"\nModel: {model_name}") print("-" * 40) # Size metrics size_metrics = result.get("size_metrics", {}) if size_metrics: print("📏 Model Size:") print(f" Disk Size: {size_metrics.get('disk_size_mb', 0):.1f} MB") if "parameters_millions" in size_metrics: print(f" Parameters: {size_metrics['parameters_millions']:.1f}M") if "embedding_dim" in size_metrics: print(f" Embedding Dimension: {size_metrics['embedding_dim']}") # Speed metrics speed_benchmarks = result.get("speed_benchmarks", {}) if "medium" in speed_benchmarks and "batch_32" in speed_benchmarks["medium"]: batch_32 = speed_benchmarks["medium"]["batch_32"] print("⚡ Performance (Batch 32, Medium Texts):") print(f" Throughput: {batch_32.get('texts_per_second', 0):.1f} texts/sec") print(f" Latency: {batch_32.get('time_per_text_ms', 0):.1f} ms/text") print(f" Token Speed: {batch_32.get('tokens_per_second', 0):.0f} tokens/sec") # CPU vs GPU cpu_vs_gpu = result.get("cpu_vs_gpu", {}) if cpu_vs_gpu: print("🖥️ CPU vs GPU:") for device, metrics in cpu_vs_gpu.items(): if isinstance(metrics, dict) and "error" not in metrics: print(f" {device.upper()}: {metrics.get('texts_per_second', 0):.1f} texts/sec") # Memory efficiency memory_benchmarks = result.get("memory_benchmarks", {}) if "batch_32" in memory_benchmarks: batch_32_mem = memory_benchmarks["batch_32"] if not batch_32_mem.get("oom", False) and "error" not in batch_32_mem: print("💾 Memory Usage (Batch 32):") print(f" Total: {batch_32_mem.get('memory_used_mb', 0):.1f} MB") print(f" Per Text: {batch_32_mem.get('memory_per_text_mb', 0):.2f} MB") def create_performance_radar_chart(self, model_name: str, language_scores: dict[str, float]) -> str: """Create radar chart showing performance across languages.""" if not PLOTLY_AVAILABLE: logger.warning("Plotly not available, skipping radar chart") return "" languages = list(language_scores.keys()) scores = list(language_scores.values()) if not languages: return "" # Close the radar chart languages_closed = [*languages, languages[0]] scores_closed = [*scores, scores[0]] fig = go.Figure() fig.add_trace( go.Scatterpolar( r=scores_closed, theta=languages_closed, fill="toself", name=model_name, line_color="rgb(67, 147, 195)", fillcolor="rgba(67, 147, 195, 0.3)", ) ) fig.update_layout( polar={"radialaxis": {"visible": True, "range": [0, max(scores) * 1.1]}}, showlegend=True, title=f"CodeSearchNet Performance by Language: {model_name}", width=800, height=600, ) static_path = self.images_dir / "code_performance_radar.png" try: fig.write_image(str(static_path), width=800, height=600, scale=2) return str(static_path) except Exception as e: logger.warning(f"Could not create static image: {e}") return "" def create_comparative_radar_chart(self, simplified_models: list, peer_models: list) -> str: """Create comparative radar chart between best distilled model and top peer models.""" if not PLOTLY_AVAILABLE: logger.warning("Plotly not available, skipping comparative radar chart") return "" if not simplified_models: return "" # Get the best simplified model best_simplified = max(simplified_models, key=lambda x: x.get("overall", {}).get("ndcg@10", 0)) # Get top 3 peer models by performance peer_models_sorted = sorted(peer_models, key=lambda x: x.get("overall", {}).get("ndcg@10", 0), reverse=True) top_peers = peer_models_sorted[:3] models_to_compare = [best_simplified, *top_peers] fig = go.Figure() # Define colors for each model colors = ["rgb(255, 99, 132)", "rgb(54, 162, 235)", "rgb(255, 205, 86)", "rgb(75, 192, 192)"] # Collect all scores to determine the appropriate range all_scores = [] for i, model_result in enumerate(models_to_compare): model_name = model_result["model_name"] languages = model_result.get("languages", {}) # Calculate language scores language_scores = {} for lang, lang_data in languages.items(): metrics = lang_data.get("metrics", {}) language_scores[lang.title()] = metrics.get("ndcg@10", 0) if language_scores: languages_list = list(language_scores.keys()) scores_list = list(language_scores.values()) all_scores.extend(scores_list) # Collect scores for range calculation # Close the radar chart languages_closed = [*languages_list, languages_list[0]] scores_closed = [*scores_list, scores_list[0]] # Determine line style - solid for best distilled, dash for peers line_dash = "solid" if i == 0 else "dash" line_width = 3 if i == 0 else 2 fig.add_trace( go.Scatterpolar( r=scores_closed, theta=languages_closed, fill="toself" if i == 0 else "none", name=model_name, line={"color": colors[i % len(colors)], "dash": line_dash, "width": line_width}, fillcolor=f"rgba{colors[i % len(colors)][3:-1]}, 0.2)" if i == 0 else None, ) ) # Calculate dynamic range based on actual data if all_scores: max_score = max(all_scores) # Set range to slightly above the maximum score with some padding range_max = min(1.0, max_score * 1.1) # Cap at 1.0 since NDCG@10 max is 1.0 else: range_max = 1.0 # Default fallback fig.update_layout( polar={"radialaxis": {"visible": True, "range": [0, range_max]}}, showlegend=True, title="Model Comparison: Best Distilled vs Top Peer Models", width=900, height=700, ) static_path = self.images_dir / "comparative_radar.png" try: fig.write_image(str(static_path), width=900, height=700, scale=2) return str(static_path) except Exception as e: logger.warning(f"Could not create comparative radar chart: {e}") return "" def create_individual_radar_charts(self, simplified_models: list) -> dict[str, str]: """Create individual radar charts for all simplified models.""" radar_charts = {} for result in simplified_models: model_name = result["model_name"] model_languages = result.get("languages", {}) model_language_scores = {} for lang, lang_data in model_languages.items(): metrics = lang_data.get("metrics", {}) model_language_scores[lang.title()] = metrics.get("ndcg@10", 0) if model_language_scores: # Create unique filename for each model safe_model_name = "".join(c for c in model_name if c.isalnum() or c in ("-", "_")).rstrip() radar_chart_path = self.create_performance_radar_chart_individual( model_name, model_language_scores, safe_model_name ) if radar_chart_path: radar_charts[model_name] = radar_chart_path return radar_charts def create_performance_radar_chart_individual( self, model_name: str, language_scores: dict[str, float], filename_suffix: str ) -> str: """Create radar chart for individual model with unique filename.""" if not PLOTLY_AVAILABLE: logger.warning("Plotly not available, skipping radar chart") return "" languages = list(language_scores.keys()) scores = list(language_scores.values()) if not languages: return "" # Close the radar chart languages_closed = [*languages, languages[0]] scores_closed = [*scores, scores[0]] fig = go.Figure() fig.add_trace( go.Scatterpolar( r=scores_closed, theta=languages_closed, fill="toself", name=model_name, line_color="rgb(67, 147, 195)", fillcolor="rgba(67, 147, 195, 0.3)", ) ) fig.update_layout( polar={"radialaxis": {"visible": True, "range": [0, max(scores) * 1.1]}}, showlegend=True, title=f"CodeSearchNet Performance by Language: {model_name}", width=800, height=600, ) static_path = self.images_dir / f"radar_{filename_suffix}.png" try: fig.write_image(str(static_path), width=800, height=600, scale=2) return str(static_path) except Exception as e: logger.warning(f"Could not create static image for {model_name}: {e}") return "" def plot_model_comparison(self, save_path: str | None = None) -> str: """Create comparison plots for models.""" if self.comparison_df is None or self.comparison_df.empty: logger.warning("No comparison data available for plotting") return "" fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle("CodeSearchNet Model Comparison", fontsize=16, fontweight="bold") # NDCG@10 comparison axes[0, 0].barh(self.comparison_df["Model"], self.comparison_df["NDCG@10"]) axes[0, 0].set_title("NDCG@10 Comparison") axes[0, 0].set_xlabel("NDCG@10") # MRR comparison axes[0, 1].barh(self.comparison_df["Model"], self.comparison_df["MRR"]) axes[0, 1].set_title("Mean Reciprocal Rank (MRR)") axes[0, 1].set_xlabel("MRR") # Recall@5 comparison axes[1, 0].barh(self.comparison_df["Model"], self.comparison_df["Recall@5"]) axes[1, 0].set_title("Recall@5") axes[1, 0].set_xlabel("Recall@5") # Mean Rank comparison (lower is better) axes[1, 1].barh(self.comparison_df["Model"], self.comparison_df["Mean_Rank"]) axes[1, 1].set_title("Mean Rank (lower is better)") axes[1, 1].set_xlabel("Mean Rank") plt.tight_layout() output_path = save_path or str(self.images_dir / "model_comparison.png") plt.savefig(output_path, dpi=300, bbox_inches="tight") plt.close() return output_path def plot_language_heatmap(self, save_path: str | None = None) -> str: """Create a heatmap of performance across languages.""" if not self.results: return "" # Prepare data for heatmap heatmap_data = [] for result in self.results: model_name = result["model_name"] languages = result.get("languages", {}) row = {"Model": model_name} for lang in CODE_LANGUAGES: if lang in languages: metrics = languages[lang].get("metrics", {}) row[lang.title()] = metrics.get("ndcg@10", 0) else: row[lang.title()] = 0 heatmap_data.append(row) if not heatmap_data: return "" df = pd.DataFrame(heatmap_data).set_index("Model") plt.figure(figsize=(12, 8)) sns.heatmap( df, annot=True, fmt=".3f", cmap="RdYlBu_r", center=0.2, vmin=0, vmax=df.to_numpy().max(), cbar_kws={"label": "NDCG@10 Score"}, ) plt.title( "CodeSearchNet Performance Heatmap by Language", fontsize=16, fontweight="bold", ) plt.xlabel("Programming Language", fontsize=12) plt.ylabel("Model", fontsize=12) plt.tight_layout() output_path = save_path or str(self.images_dir / "language_heatmap.png") plt.savefig(output_path, dpi=300, bbox_inches="tight") plt.close() return output_path def plot_benchmark_performance(self, save_path: str | None = None) -> str: """Create comprehensive benchmark performance plots.""" if not self.benchmark_results: logger.warning("No benchmark data available for plotting") return "" fig, axes = plt.subplots(2, 3, figsize=(18, 12)) fig.suptitle("Performance Benchmark Analysis", fontsize=16, fontweight="bold") # 1. Model Size Comparison if self.benchmark_df is not None and "Disk_Size_MB" in self.benchmark_df.columns: axes[0, 0].barh(self.benchmark_df["Model"], self.benchmark_df["Disk_Size_MB"]) axes[0, 0].set_title("Model Size (MB)") axes[0, 0].set_xlabel("Size (MB)") # 2. Inference Throughput if self.benchmark_df is not None and "Throughput_TextsPerSec" in self.benchmark_df.columns: axes[0, 1].barh(self.benchmark_df["Model"], self.benchmark_df["Throughput_TextsPerSec"]) axes[0, 1].set_title("Inference Throughput") axes[0, 1].set_xlabel("Texts/Second") # 3. Memory Usage if self.benchmark_df is not None and "Memory_Used_MB" in self.benchmark_df.columns: axes[0, 2].barh(self.benchmark_df["Model"], self.benchmark_df["Memory_Used_MB"]) axes[0, 2].set_title("Memory Usage (Batch 32)") axes[0, 2].set_xlabel("Memory (MB)") # 4. Latency Comparison if self.benchmark_df is not None and "Latency_MsPerText" in self.benchmark_df.columns: axes[1, 0].barh(self.benchmark_df["Model"], self.benchmark_df["Latency_MsPerText"]) axes[1, 0].set_title("Inference Latency") axes[1, 0].set_xlabel("Milliseconds/Text") # 5. CPU vs GPU Performance if self.benchmark_df is not None: cpu_col = "CPU_TextsPerSec" gpu_col = "CUDA_TextsPerSec" if cpu_col in self.benchmark_df.columns and gpu_col in self.benchmark_df.columns: x = np.arange(len(self.benchmark_df)) width = 0.35 axes[1, 1].bar(x - width / 2, self.benchmark_df[cpu_col], width, label="CPU", alpha=0.7) axes[1, 1].bar(x + width / 2, self.benchmark_df[gpu_col], width, label="GPU", alpha=0.7) axes[1, 1].set_title("CPU vs GPU Performance") axes[1, 1].set_ylabel("Texts/Second") axes[1, 1].set_xticks(x) axes[1, 1].set_xticklabels(self.benchmark_df["Model"], rotation=45, ha="right") axes[1, 1].legend() # 6. Parameter Efficiency if ( self.benchmark_df is not None and "Parameters_M" in self.benchmark_df.columns and "Throughput_TextsPerSec" in self.benchmark_df.columns ): # Efficiency = Throughput / Parameters (higher is better) efficiency = self.benchmark_df["Throughput_TextsPerSec"] / (self.benchmark_df["Parameters_M"] + 1e-6) axes[1, 2].barh(self.benchmark_df["Model"], efficiency) axes[1, 2].set_title("Parameter Efficiency") axes[1, 2].set_xlabel("Texts/Sec per Million Parameters") plt.tight_layout() output_path = save_path or str(self.images_dir / "benchmark_performance.png") plt.savefig(output_path, dpi=300, bbox_inches="tight") plt.close() return output_path def plot_batch_size_scaling(self, save_path: str | None = None) -> str: """Create batch size scaling analysis plot.""" if not self.benchmark_results: return "" plt.figure(figsize=(12, 8)) for result in self.benchmark_results: model_name = result.get("model_name", "Unknown") speed_benchmarks = result.get("speed_benchmarks", {}) # Extract batch size performance for medium texts if "medium" in speed_benchmarks: batch_sizes = [] throughputs = [] for batch_key, metrics in speed_benchmarks["medium"].items(): if batch_key.startswith("batch_"): batch_size = int(batch_key.split("_")[1]) throughput = metrics.get("texts_per_second", 0) batch_sizes.append(batch_size) throughputs.append(throughput) if batch_sizes: plt.plot(batch_sizes, throughputs, marker="o", label=model_name, linewidth=2) plt.xlabel("Batch Size", fontsize=12) plt.ylabel("Throughput (Texts/Second)", fontsize=12) plt.title("Batch Size Scaling Performance", fontsize=16, fontweight="bold") plt.legend() plt.grid(visible=True, alpha=0.3) plt.xscale("log", base=2) output_path = save_path or str(self.images_dir / "batch_size_scaling.png") plt.savefig(output_path, dpi=300, bbox_inches="tight") plt.close() return output_path def plot_memory_scaling(self, save_path: str | None = None) -> str: """Create memory scaling analysis plot.""" if not self.benchmark_results: return "" plt.figure(figsize=(12, 8)) for result in self.benchmark_results: model_name = result.get("model_name", "Unknown") memory_benchmarks = result.get("memory_benchmarks", {}) batch_sizes = [] memory_usage = [] for batch_key, metrics in memory_benchmarks.items(): if batch_key.startswith("batch_") and not metrics.get("oom", False) and "error" not in metrics: batch_size = int(batch_key.split("_")[1]) memory_mb = metrics.get("memory_used_mb", 0) batch_sizes.append(batch_size) memory_usage.append(memory_mb) if batch_sizes: plt.plot(batch_sizes, memory_usage, marker="s", label=model_name, linewidth=2) plt.xlabel("Batch Size", fontsize=12) plt.ylabel("Memory Usage (MB)", fontsize=12) plt.title("Memory Scaling by Batch Size", fontsize=16, fontweight="bold") plt.legend() plt.grid(visible=True, alpha=0.3) plt.xscale("log", base=2) output_path = save_path or str(self.images_dir / "memory_scaling.png") plt.savefig(output_path, dpi=300, bbox_inches="tight") plt.close() return output_path def create_peer_comparison_chart(self, model_name: str) -> str: """Create comparison chart using actual evaluation results.""" if self.comparison_df is None or self.comparison_df.empty: logger.warning("No comparison data available for peer comparison chart") return "" # Use actual evaluation results instead of hardcoded scores df_sorted = self.comparison_df.sort_values("NDCG@10", ascending=True) plt.figure(figsize=(12, 8)) # Color models differently - highlight the user's model colors = [] for model in df_sorted["Model"]: if model_name.lower() in model.lower() or "gte_qwen2_m2v_code" in model.lower(): colors.append("red") # User's model else: colors.append("skyblue") # Peer models bars = plt.barh(df_sorted["Model"], df_sorted["NDCG@10"], color=colors) # Highlight current model with special formatting for i, model in enumerate(df_sorted["Model"]): if model_name.lower() in model.lower() or "gte_qwen2_m2v_code" in model.lower(): bars[i].set_alpha(0.8) bars[i].set_edgecolor("black") bars[i].set_linewidth(2) plt.xlabel("NDCG@10 Score", fontsize=12) plt.title( "CodeSearchNet Model Comparison (Actual Results)", fontsize=16, fontweight="bold", ) plt.grid(axis="x", alpha=0.3) # Add score labels for i, score in enumerate(df_sorted["NDCG@10"]): plt.text(score + 0.005, i, f"{score:.3f}", va="center") plt.tight_layout() output_path = self.images_dir / "peer_comparison.png" plt.savefig(output_path, dpi=300, bbox_inches="tight") plt.close() return str(output_path) def create_efficiency_analysis(self, model_name: str) -> str: """Create efficiency analysis chart using actual evaluation results.""" if self.comparison_df is None or self.comparison_df.empty: logger.warning("No comparison data available for efficiency analysis") return "" models = [] scores = [] params = [] is_user_model = [] # Process all evaluated models for _, row in self.comparison_df.iterrows(): model_display_name = row["Model"] current_model_score = row["NDCG@10"] # Determine if this is the user's model is_users = ( model_name.lower() in model_display_name.lower() or "gte_qwen2_m2v_code" in model_display_name.lower() ) if is_users: # User's distilled model models.append(model_display_name) # Safe conversion to float for pandas values score_value = pd.to_numeric(current_model_score, errors="coerce") scores.append(float(score_value) if not pd.isna(score_value) else 0.0) # Safe conversion for DISTILLED_MODEL_SPECS parameters param_value = DISTILLED_MODEL_SPECS.get("parameters", 39) params.append(float(param_value) if isinstance(param_value, (int, float)) else 39.0) is_user_model.append(True) else: # Find corresponding peer model specs model_key = None for peer_key in MODEL_SPECS: peer_short_name = peer_key.split("/")[-1].lower() if peer_short_name in model_display_name.lower(): model_key = peer_key break if model_key and model_key in MODEL_SPECS: models.append(model_display_name.split("/")[-1]) # Short name # Safe conversion to float for pandas values score_value = pd.to_numeric(current_model_score, errors="coerce") scores.append(float(score_value) if not pd.isna(score_value) else 0.0) param_value = MODEL_SPECS[model_key].get("parameters", 100.0) params.append(float(param_value) if isinstance(param_value, (int, float)) else 100.0) is_user_model.append(False) if not models: logger.warning("No models with parameter specifications found") return "" plt.figure(figsize=(12, 8)) # Plot peer models peer_models = [m for i, m in enumerate(models) if not is_user_model[i]] peer_params = [p for i, p in enumerate(params) if not is_user_model[i]] peer_scores = [s for i, s in enumerate(scores) if not is_user_model[i]] if peer_models: plt.scatter( peer_params, peer_scores, s=100, alpha=0.6, label="Peer Models", color="skyblue", ) # Plot user's model user_models = [m for i, m in enumerate(models) if is_user_model[i]] user_params = [p for i, p in enumerate(params) if is_user_model[i]] user_scores = [s for i, s in enumerate(scores) if is_user_model[i]] if user_models: plt.scatter( user_params, user_scores, s=200, color="red", alpha=0.8, label=f"{user_models[0]} (Distilled)", marker="*", ) # Add model labels for i, (model, param, score) in enumerate(zip(models, params, scores, strict=False)): if is_user_model[i]: plt.annotate( model, (param, score), xytext=(10, 10), textcoords="offset points", fontweight="bold", color="red", ) else: plt.annotate( model, (param, score), xytext=(5, 5), textcoords="offset points", fontsize=9, ) plt.xlabel("Model Size (Million Parameters)", fontsize=12) plt.ylabel("NDCG@10 Score", fontsize=12) plt.title( "Model Efficiency: Performance vs Size (Actual Results)", fontsize=16, fontweight="bold", ) plt.legend() plt.grid(visible=True, alpha=0.3) plt.xscale("log") plt.tight_layout() output_path = self.images_dir / "efficiency_analysis.png" plt.savefig(output_path, dpi=300, bbox_inches="tight") plt.close() return str(output_path) def plot_model_specifications(self, save_path: str | None = None) -> str: """Create visualization of our model specifications.""" if not self.model_specs: logger.warning("No model specifications available for plotting") return "" # Filter only successfully analyzed models successful_specs = {k: v for k, v in self.model_specs.items() if v.get("analysis_successful", False)} if not successful_specs: logger.warning("No successfully analyzed models for plotting") return "" fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle("Our Distilled Models - Specifications Analysis", fontsize=16, fontweight="bold") # Extract data model_names = list(successful_specs.keys()) # Shorten model names for better display display_names = [name.replace("code_model2vec_", "").replace("_", " ") for name in model_names] vocab_sizes = [spec["vocabulary_size"] for spec in successful_specs.values()] param_counts = [spec["parameters_millions"] for spec in successful_specs.values()] embed_dims = [spec["embedding_dimensions"] for spec in successful_specs.values()] disk_sizes = [spec["disk_size_mb"] for spec in successful_specs.values()] # 1. Vocabulary Size Comparison axes[0, 0].barh(display_names, vocab_sizes, color="skyblue") axes[0, 0].set_title("Vocabulary Size") axes[0, 0].set_xlabel("Number of Tokens") for i, v in enumerate(vocab_sizes): axes[0, 0].text(v + max(vocab_sizes) * 0.01, i, f"{v:,}", va="center", fontsize=9) # 2. Parameter Count Comparison axes[0, 1].barh(display_names, param_counts, color="lightgreen") axes[0, 1].set_title("Model Parameters") axes[0, 1].set_xlabel("Parameters (Millions)") for i, v in enumerate(param_counts): axes[0, 1].text(v + max(param_counts) * 0.01, i, f"{v:.1f}M", va="center", fontsize=9) # 3. Embedding Dimensions axes[1, 0].barh(display_names, embed_dims, color="lightsalmon") axes[1, 0].set_title("Embedding Dimensions") axes[1, 0].set_xlabel("Dimensions") for i, v in enumerate(embed_dims): axes[1, 0].text(v + max(embed_dims) * 0.01, i, f"{v}", va="center", fontsize=9) # 4. Disk Size axes[1, 1].barh(display_names, disk_sizes, color="plum") axes[1, 1].set_title("Model Size on Disk") axes[1, 1].set_xlabel("Size (MB)") for i, v in enumerate(disk_sizes): axes[1, 1].text(v + max(disk_sizes) * 0.01, i, f"{v:.1f}MB", va="center", fontsize=9) plt.tight_layout() output_path = save_path or str(self.images_dir / "model_specifications.png") plt.savefig(output_path, dpi=300, bbox_inches="tight") plt.close() return output_path def generate_comprehensive_report(self, model_name: str = "Simplified Distillation Models") -> str: """Generate comprehensive markdown report for all evaluated models.""" if not self.results: logger.error("No results to analyze") return "" # Find all simplified distillation models simplified_models = [] peer_models = [] for result in self.results: result_model_name = result["model_name"] if ( "code_model2vec" in result_model_name.lower() or "distilled" in result_model_name.lower() or "(ours)" in result_model_name.lower() ): simplified_models.append(result) else: peer_models.append(result) # Get the best performing simplified model for main analysis if simplified_models: main_result = max(simplified_models, key=lambda x: x.get("overall", {}).get("ndcg@10", 0)) main_model_name = main_result["model_name"] else: # Fallback to first result if no simplified models found main_result = self.results[0] main_model_name = main_result["model_name"] overall = main_result.get("overall", {}) languages = main_result.get("languages", {}) # Calculate language scores for radar chart language_scores = {} for lang, lang_data in languages.items(): metrics = lang_data.get("metrics", {}) language_scores[lang.title()] = metrics.get("ndcg@10", 0) # Create visualizations logger.info("Generating visualizations...") output_dir, images_dir, reports_dir = setup_directories() self.create_performance_radar_chart(main_model_name, language_scores) comparison_chart = self.plot_model_comparison() heatmap_chart = self.plot_language_heatmap() peer_chart = self.create_peer_comparison_chart(main_model_name) efficiency_chart = self.create_efficiency_analysis(main_model_name) model_specs_chart = self.plot_model_specifications() # Generate individual radar charts for all simplified models individual_radar_charts = self.create_individual_radar_charts(simplified_models) # Create comparative radar chart (best distilled vs top peer models) comparative_radar_chart = self.create_comparative_radar_chart(simplified_models, peer_models) # Create benchmark visualizations benchmark_chart = "" batch_scaling_chart = "" memory_scaling_chart = "" if self.benchmark_results: benchmark_chart = self.plot_benchmark_performance() batch_scaling_chart = self.plot_batch_size_scaling() memory_scaling_chart = self.plot_memory_scaling() # Generate report report = f"""# Code-Specialized Model2Vec Distillation Analysis ## 🎯 Executive Summary This report presents a comprehensive analysis of Model2Vec distillation experiments using different teacher models for code-specialized embedding generation. ### Evaluated Models Overview **Simplified Distillation Models:** {len(simplified_models)} **Peer Comparison Models:** {len(peer_models)} **Total Models Analyzed:** {len(self.results)} ### Best Performing Simplified Model: {main_model_name} **Overall CodeSearchNet Performance:** - **NDCG@10**: {overall.get("ndcg@10", 0):.4f} - **Mean Reciprocal Rank (MRR)**: {overall.get("mrr", 0):.4f} - **Recall@5**: {overall.get("recall@5", 0):.4f} - **Mean Rank**: {overall.get("mean_rank", 0):.1f} ## 📊 Comprehensive Model Comparison ### All Simplified Distillation Models Performance """ # Add table of all simplified models if simplified_models: report += "| Model | Teacher | NDCG@10 | MRR | Recall@5 | Status |\n" report += "|-------|---------|---------|-----|----------|--------|\n" # Sort by performance simplified_models_sorted = sorted( simplified_models, key=lambda x: x.get("overall", {}).get("ndcg@10", 0), reverse=True ) for rank, result in enumerate(simplified_models_sorted, 1): model_display = result["model_name"] overall_metrics = result.get("overall", {}) # Extract teacher model name from model name teacher_name, teacher_link = get_teacher_model_info(model_display) status = "🥇 Best" if rank == 1 else "🥈 2nd" if rank == 2 else "🥉 3rd" if rank == 3 else f"#{rank}" # Use linked teacher name if available teacher_display = f"[{teacher_name}]({teacher_link})" if teacher_link else teacher_name report += f"| {model_display} | {teacher_display} | {overall_metrics.get('ndcg@10', 0):.4f} | {overall_metrics.get('mrr', 0):.4f} | {overall_metrics.get('recall@5', 0):.4f} | {status} |\n" # Add model specifications section if self.model_specs: successful_specs = {k: v for k, v in self.model_specs.items() if v.get("analysis_successful", False)} if successful_specs: report += """ ### 📊 Model Specifications Analysis Our distilled models exhibit consistent architectural characteristics across different teacher models: | Model | Vocabulary Size | Parameters | Embedding Dim | Disk Size | |-------|----------------|------------|---------------|-----------| """ # Sort models by performance for consistency for result in simplified_models_sorted: model_display = result["model_name"] if model_display in successful_specs: spec = successful_specs[model_display] vocab_size = spec["vocabulary_size"] params_m = spec["parameters_millions"] embed_dim = spec["embedding_dimensions"] disk_size = spec["disk_size_mb"] report += f"| {model_display.replace('code_model2vec_', '')} | {vocab_size:,} | {params_m:.1f}M | {embed_dim} | {disk_size:.1f}MB |\n" if model_specs_chart: report += f""" ![Model Specifications]({model_specs_chart}) *Comprehensive analysis of our distilled models showing vocabulary size, parameter count, embedding dimensions, and storage requirements.* #### Key Insights from Model Specifications: """ # Calculate some insights vocab_sizes = [spec["vocabulary_size"] for spec in successful_specs.values()] param_counts = [spec["parameters_millions"] for spec in successful_specs.values()] embed_dims = [spec["embedding_dimensions"] for spec in successful_specs.values()] disk_sizes = [spec["disk_size_mb"] for spec in successful_specs.values()] if vocab_sizes: avg_vocab = sum(vocab_sizes) / len(vocab_sizes) avg_params = sum(param_counts) / len(param_counts) avg_disk = sum(disk_sizes) / len(disk_sizes) report += f""" - **Vocabulary Consistency**: All models use vocabulary sizes ranging from {min(vocab_sizes):,} to {max(vocab_sizes):,} tokens (avg: {avg_vocab:,.0f}) - **Parameter Efficiency**: Models range from {min(param_counts):.1f}M to {max(param_counts):.1f}M parameters (avg: {avg_params:.1f}M) - **Storage Efficiency**: Disk usage ranges from {min(disk_sizes):.1f}MB to {max(disk_sizes):.1f}MB (avg: {avg_disk:.1f}MB) - **Embedding Dimensions**: Consistent {embed_dims[0]} dimensions across all models (optimized for efficiency) """ report += """ ### Key Findings """ if simplified_models and len(simplified_models) > 1: best_model = simplified_models_sorted[0] worst_model = simplified_models_sorted[-1] best_score = best_model.get("overall", {}).get("ndcg@10", 0) worst_score = worst_model.get("overall", {}).get("ndcg@10", 0) report += f""" - **Best Teacher Model**: {best_model["model_name"]} (NDCG@10: {best_score:.4f}) - **Least Effective Teacher**: {worst_model["model_name"]} (NDCG@10: {worst_score:.4f}) - **Performance Range**: {((best_score - worst_score) / best_score * 100):.1f}% difference between best and worst - **Average Performance**: {sum(r.get("overall", {}).get("ndcg@10", 0) for r in simplified_models) / len(simplified_models):.4f} NDCG@10 """ # Add radar charts section report += """ ## 🎯 Language Performance Radar Charts ### Best Model vs Peer Models Comparison """ if comparative_radar_chart: report += f"![Comparative Radar Chart]({comparative_radar_chart})\n\n" report += "*Comparative view showing how the best simplified distillation model performs against top peer models across programming languages.*\n\n" # Add individual radar charts for all simplified models (sorted by performance) if individual_radar_charts: report += "### Individual Model Performance by Language\n\n" # Sort the radar charts by model performance (best to worst) for result in simplified_models_sorted: chart_model_name = result["model_name"] if chart_model_name in individual_radar_charts: chart_path = individual_radar_charts[chart_model_name] # Extract teacher name for cleaner display teacher_name, teacher_link = get_teacher_model_info(chart_model_name) # Use linked teacher name if available teacher_display = f"[{teacher_name}]({teacher_link})" if teacher_link else teacher_name # Get performance for display overall_metrics = result.get("overall", {}) ndcg_score = overall_metrics.get("ndcg@10", 0) report += f"#### {chart_model_name} (Teacher: {teacher_display}) - NDCG@10: {ndcg_score:.4f}\n\n" report += f"![{chart_model_name} Radar Chart]({chart_path})\n\n" report += f""" ## 🏆 Peer Model Comparison ![Peer Comparison]({peer_chart}) *Comparison with established code-specialized embedding models using actual evaluation results.* ### Complete Model Ranking """ # Add comprehensive ranking table if self.comparison_df is not None and len(self.comparison_df) > 0: report += "| Rank | Model | Type | NDCG@10 | MRR | Recall@5 |\n" report += "|------|-------|------|---------|-----|----------|\n" for rank in range(len(self.comparison_df)): row_data = self.comparison_df.iloc[rank] model_name_display = str(row_data["Model"]) # Determine model type if ( "code_model2vec" in model_name_display.lower() or "distilled" in model_name_display.lower() or "(ours)" in model_name_display.lower() ): # Check if it's a fine-tuned model if "fine_tuned" in model_name_display.lower(): model_type = "**🎓 Fine-tuned Distillation**" else: model_type = "**🔥 Simplified Distillation**" elif any(code_term in model_name_display.lower() for code_term in ["codebert", "graphcode", "codet5"]): model_type = "Code-Specific" elif "potion" in model_name_display.lower(): model_type = "Model2Vec" else: model_type = "General" report += f"| {rank + 1} | {model_name_display} | {model_type} | {row_data['NDCG@10']:.4f} | {row_data['MRR']:.4f} | {row_data['Recall@5']:.4f} |\n" report += f""" ## 📈 Performance Analysis ### Multi-Model Comparison Charts ![Model Comparison]({comparison_chart}) *Comprehensive comparison across all evaluation metrics.* ### Language Performance Analysis ![Language Heatmap]({heatmap_chart}) *Performance heatmap showing how different models perform across programming languages.* ### Efficiency Analysis ![Efficiency Analysis]({efficiency_chart}) *Performance vs model size analysis showing the efficiency benefits of distillation.* """ # Add benchmark analysis if available if self.benchmark_results: report += f""" ## ⚡ Operational Performance Analysis ![Benchmark Performance]({benchmark_chart}) *Comprehensive performance benchmarking across multiple operational metrics.* ### Performance Scaling Analysis ![Batch Size Scaling]({batch_scaling_chart}) *How performance scales with different batch sizes for optimal throughput.* ![Memory Scaling]({memory_scaling_chart}) *Memory usage patterns across different batch sizes.* """ # Add detailed language analysis report += """ ## 🔍 Language-Specific Analysis ### Performance by Programming Language """ if language_scores: report += "| Language | Best Model Performance | Average Performance | Language Difficulty |\n" report += "|----------|------------------------|--------------------|--------------------|\n" for lang in sorted(language_scores.keys()): # Find best performance for this language across all models lang_performances = [] for result in self.results: lang_data = result.get("languages", {}).get(lang.lower(), {}) if lang_data: lang_performances.append(lang_data.get("metrics", {}).get("ndcg@10", 0)) if lang_performances: best_lang_perf = max(lang_performances) avg_lang_perf = sum(lang_performances) / len(lang_performances) difficulty = "Easy" if avg_lang_perf > 0.3 else "Medium" if avg_lang_perf > 0.2 else "Hard" report += f"| {lang} | {best_lang_perf:.4f} | {avg_lang_perf:.4f} | {difficulty} |\n" report += """ ## 🎯 Conclusions and Recommendations ### Teacher Model Analysis Based on the evaluation results across all simplified distillation models: """ if simplified_models and len(simplified_models) > 1: # Analyze which teacher models work best teacher_performance = {} for result in simplified_models: model_name = result["model_name"] score = result.get("overall", {}).get("ndcg@10", 0) teacher_name, teacher_link = get_teacher_model_info(model_name) teacher_performance[teacher_name] = score if teacher_performance: best_teacher = max(teacher_performance.items(), key=lambda x: x[1]) worst_teacher = min(teacher_performance.items(), key=lambda x: x[1]) report += f""" 1. **Best Teacher Model**: {best_teacher[0]} (NDCG@10: {best_teacher[1]:.4f}) 2. **Least Effective Teacher**: {worst_teacher[0]} (NDCG@10: {worst_teacher[1]:.4f}) 3. **Teacher Model Impact**: Choice of teacher model affects performance by {((best_teacher[1] - worst_teacher[1]) / best_teacher[1] * 100):.1f}% ### Recommendations - **For Production**: Use {best_teacher[0]} as teacher model for best performance - **For Efficiency**: Model2Vec distillation provides significant size reduction with competitive performance - **For Code Tasks**: Specialized models consistently outperform general-purpose models """ report += f""" ## 📄 Methodology ### Evaluation Protocol - **Dataset**: CodeSearchNet test sets for 6 programming languages - **Metrics**: NDCG@k, MRR, Recall@k following CodeSearchNet methodology - **Query Format**: Natural language documentation strings - **Corpus Format**: Function code strings - **Evaluation**: Retrieval of correct code for each documentation query ### Teacher Models Tested - [sentence-transformers/all-MiniLM-L6-v2](https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2) (proven baseline) - [sentence-transformers/all-mpnet-base-v2](https://huggingface.co/sentence-transformers/all-mpnet-base-v2) (general purpose) - [sentence-transformers/paraphrase-MiniLM-L6-v2](https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L6-v2) (paraphrase model) - [microsoft/codebert-base](https://huggingface.co/microsoft/codebert-base) (code-specialized) - [microsoft/graphcodebert-base](https://huggingface.co/microsoft/graphcodebert-base) (graph-aware code model) - [Alibaba-NLP/gte-Qwen2-1.5B-instruct](https://huggingface.co/Alibaba-NLP/gte-Qwen2-1.5B-instruct) (instruction model) - [BAAI/bge-m3](https://huggingface.co/BAAI/bge-m3) (multilingual model) - [jinaai/jina-embeddings-v3](https://huggingface.co/jinaai/jina-embeddings-v3) (modern embedding model) - [nomic-ai/nomic-embed-text-v2-moe](https://huggingface.co/nomic-ai/nomic-embed-text-v2-moe) (mixture of experts) - [Qodo/Qodo-Embed-1-1.5B](https://huggingface.co/Qodo/Qodo-Embed-1-1.5B) (code-specialized) - [lightonai/Reason-ModernColBERT](https://huggingface.co/lightonai/Reason-ModernColBERT) (ColBERT architecture) - [Linq-AI-Research/Linq-Embed-Mistral](https://huggingface.co/Linq-AI-Research/Linq-Embed-Mistral) (Mistral-based) - [BAAI/bge-code-v1](https://huggingface.co/BAAI/bge-code-v1) (code-specialized BGE) - [Salesforce/SFR-Embedding-Code-2B_R](https://huggingface.co/Salesforce/SFR-Embedding-Code-2B_R) (large code model) ### Distillation Method - **Technique**: Model2Vec static embedding generation - **Parameters**: PCA dims=256, SIF coefficient=1e-3, Zipf weighting=True - **Training Data**: CodeSearchNet comment-code pairs - **Languages**: Python, JavaScript, Java, PHP, Ruby, Go --- *Report generated on {time.strftime("%Y-%m-%d %H:%M:%S")} using automated analysis pipeline.* *For questions about methodology or results, please refer to the CodeSearchNet documentation.* """ return report def export_results(self, output_file: str) -> None: """Export results to CSV format.""" if self.comparison_df is not None: self.comparison_df.to_csv(output_file, index=False) logger.info(f"Results exported to {output_file}") def main( results_dir: str = DEFAULT_EVALUATION_DIR, model_name: str = "code_model2vec_distilled_models", output: str = "REPORT.md", export_csv: str | None = None, ) -> None: """Main analysis function.""" logger.info("Starting CodeSearchNet Analysis with Integrated Benchmarks") logger.info("=" * 60) # Setup output directories output_dir, images_dir, reports_dir = setup_directories() # Initialize analyzer with results directory (benchmarks are integrated) analyzer = CodeSearchNetAnalyzer( results_dir=results_dir, benchmark_dir=None, # No longer needed - benchmarks are in comprehensive files images_dir=images_dir, ) # Load results (this will also load benchmark data from comprehensive files) analyzer.load_results() if not analyzer.results: logger.error("No evaluation results found! Please run evaluation first.") return # Print summary (includes both evaluation and benchmark summaries) analyzer.print_summary() analyzer.analyze_language_performance() # Analyze benchmark performance if available if analyzer.benchmark_results: analyzer.analyze_benchmark_performance() else: logger.warning("No benchmark results found. Models may have been evaluated with --skip-benchmark flag.") # Generate comprehensive report with benchmark integration logger.info("Generating comprehensive report with integrated benchmark data...") report = analyzer.generate_comprehensive_report(model_name) # Save report report_path = Path(output) with report_path.open("w") as f: f.write(report) # Export CSV if requested if export_csv: analyzer.export_results(export_csv) # Export benchmark CSV if available if analyzer.benchmark_df is not None and not analyzer.benchmark_df.empty: benchmark_csv = report_path.parent / f"{model_name}_benchmark_comparison.csv" analyzer.benchmark_df.to_csv(benchmark_csv, index=False) logger.info(f"📊 Benchmark comparison saved to: {benchmark_csv}") logger.info("✅ CodeSearchNet analysis with integrated benchmarks complete!") logger.info(f"📊 Report saved to: {report_path}") logger.info(f"🖼️ Charts saved to: {images_dir}") logger.info(f"💾 Source: Comprehensive evaluation files in {results_dir}") if __name__ == "__main__": main()