""" Calibration manager for systematic parameter optimization. Implements the main calibration manager from calibration-system-spec.md that orchestrates parameter optimization, metrics collection, and system calibration. """ import logging import time import tempfile from typing import Dict, Any, List, Optional, Callable from pathlib import Path import yaml from datetime import datetime from .parameter_registry import ParameterRegistry, Parameter from .metrics_collector import MetricsCollector, QueryMetrics from .optimization_engine import OptimizationEngine, OptimizationStrategy, OptimizationResult from src.core.platform_orchestrator import PlatformOrchestrator from src.core.interfaces import Document logger = logging.getLogger(__name__) class CalibrationManager: """ Main calibration system orchestrator following calibration-system-spec.md. Provides systematic parameter optimization and confidence calibration through: - Parameter registry management - Automated metrics collection - Multi-strategy optimization - Configuration generation - Quality validation """ def __init__(self, platform_orchestrator: Optional[PlatformOrchestrator] = None): """ Initialize calibration manager. Args: platform_orchestrator: Optional platform orchestrator instance """ self.platform_orchestrator = platform_orchestrator self.parameter_registry = ParameterRegistry() self.metrics_collector = MetricsCollector() self.optimization_results: Dict[str, OptimizationResult] = {} self.calibration_history: List[Dict[str, Any]] = [] # Golden test set for calibration self.test_queries: List[Dict[str, Any]] = [] def load_test_set(self, test_set_path: Path) -> None: """Load golden test set for calibration.""" try: with open(test_set_path, 'r') as f: test_data = yaml.safe_load(f) if 'test_cases' in test_data: self.test_queries = test_data['test_cases'] else: # Assume entire file is list of test cases self.test_queries = test_data if isinstance(test_data, list) else [] logger.info(f"Loaded {len(self.test_queries)} test queries from {test_set_path}") except Exception as e: logger.error(f"Failed to load test set from {test_set_path}: {e}") # Create basic test set if loading fails self._create_basic_test_set() def _create_basic_test_set(self) -> None: """Create basic test set for calibration.""" self.test_queries = [ { "test_id": "TC001", "category": "factual_simple", "query": "What is RISC-V?", "expected_behavior": { "should_answer": True, "min_confidence": 0.7, "max_confidence": 0.95, "must_contain_terms": ["instruction set", "open"], "must_not_contain": ["ARM", "x86"], "min_citations": 1 } }, { "test_id": "TC002", "category": "technical_complex", "query": "How does RISC-V instruction encoding differ from ARM?", "expected_behavior": { "should_answer": True, "min_confidence": 0.5, "max_confidence": 0.8, "must_contain_terms": ["instruction", "encoding", "RISC-V"], "min_citations": 2 } }, { "test_id": "TC003", "category": "edge_case", "query": "RISC-V quantum computing applications", "expected_behavior": { "should_answer": True, "max_confidence": 0.6, "must_contain_terms": ["RISC-V"], "min_citations": 1 } } ] logger.info("Created basic test set with 3 test queries") def calibrate( self, test_set: Optional[Path] = None, strategy: OptimizationStrategy = OptimizationStrategy.GRID_SEARCH, target_metric: str = "overall_accuracy", parameters_to_optimize: Optional[List[str]] = None, max_evaluations: Optional[int] = None, base_config: Optional[Path] = None ) -> OptimizationResult: """ Run full calibration following calibration-system-spec.md workflow. Args: test_set: Path to golden test set (optional, uses basic set if None) strategy: Optimization strategy to use target_metric: Metric to optimize parameters_to_optimize: List of parameter names to optimize (optional) max_evaluations: Maximum evaluations for optimization base_config: Base configuration file to use Returns: OptimizationResult with best parameters and performance data """ logger.info("Starting full calibration process") # Load test set if test_set: self.load_test_set(test_set) elif not self.test_queries: self._create_basic_test_set() # Determine parameters to optimize if parameters_to_optimize is None: # Use key parameters identified as most impactful parameters_to_optimize = [ "bm25_k1", "bm25_b", "rrf_k", "dense_weight", "sparse_weight", "score_aware_score_weight", "score_aware_rank_weight" ] # Generate search space search_space = self.parameter_registry.get_search_space(parameters_to_optimize) logger.info(f"Optimizing {len(parameters_to_optimize)} parameters with {len(list(search_space.values())[0]) if search_space else 0} combinations") # Create evaluation function evaluation_func = self._create_evaluation_function(base_config, target_metric) # Run optimization engine = OptimizationEngine(evaluation_func) result = engine.optimize( search_space, target_metric=target_metric, strategy=strategy, max_evaluations=max_evaluations ) # Store results self.optimization_results[f"{strategy.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"] = result # Update parameter registry with best values for param_name, value in result.best_parameters.items(): self.parameter_registry.update_parameter_current_value(param_name, value) logger.info(f"Calibration completed. Best score: {result.best_score:.4f}") logger.info(f"Best parameters: {result.best_parameters}") return result def calibrate_component( self, component: str, test_subset: Optional[str] = None, parameters: Optional[List[str]] = None, strategy: OptimizationStrategy = OptimizationStrategy.GRID_SEARCH ) -> OptimizationResult: """ Run focused calibration on specific component. Args: component: Component name to focus calibration on test_subset: Subset of test queries to use (optional) parameters: Specific parameters to optimize (optional) strategy: Optimization strategy Returns: OptimizationResult for component-specific optimization """ logger.info(f"Starting component calibration for: {component}") # Get component parameters component_params = self.parameter_registry.get_parameters_for_component(component) if parameters: # Filter to specified parameters param_names = [p.name for p in component_params if p.name in parameters] else: param_names = [p.name for p in component_params] if not param_names: logger.warning(f"No parameters found for component {component}") return None # Filter test queries if subset specified test_queries = self.test_queries if test_subset: test_queries = [q for q in self.test_queries if q.get("category") == test_subset] logger.info(f"Using {len(test_queries)} test queries for component calibration") # Run targeted calibration search_space = self.parameter_registry.get_search_space(param_names) evaluation_func = self._create_evaluation_function(target_queries=test_queries) engine = OptimizationEngine(evaluation_func) result = engine.optimize(search_space, strategy=strategy) # Store component-specific result self.optimization_results[f"{component}_{strategy.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"] = result return result def _create_evaluation_function( self, base_config: Optional[Path] = None, target_metric: str = "overall_accuracy", target_queries: Optional[List[Dict[str, Any]]] = None ) -> Callable[[Dict[str, Any]], float]: """Create evaluation function for parameter optimization.""" def evaluate_parameters(parameters: Dict[str, Any]) -> float: """Evaluate parameter configuration and return quality score.""" try: # Create temporary config with new parameters config_path = self._create_config_with_parameters(parameters, base_config) # Initialize platform orchestrator with new config po = PlatformOrchestrator(str(config_path)) # Index test documents if they exist if hasattr(self, 'test_documents') and self.test_documents: po.index_documents(self.test_documents) else: # Create minimal test documents for evaluation from src.core.interfaces import Document test_docs = [ Document( content="RISC-V is an open-source instruction set architecture (ISA) based on reduced instruction set computing (RISC) principles. It provides a complete instruction set specification designed to support computer research and education.", metadata={"source": "risc_v_intro.pdf", "page": 1, "title": "RISC-V Introduction"} ), Document( content="The RISC-V vector extension (RVV) provides vector processing capabilities for parallel computation. Vector instructions operate on vectors of data elements, enabling efficient SIMD operations for scientific and machine learning applications.", metadata={"source": "risc_v_vector.pdf", "page": 5, "title": "RISC-V Vector Extension"} ) ] po.index_documents(test_docs) # Use provided queries or default test set queries_to_test = target_queries if target_queries else self.test_queries # Run evaluation on test queries total_score = 0.0 valid_evaluations = 0 for test_case in queries_to_test: try: # Execute query query = test_case["query"] expected = test_case.get("expected_behavior", {}) start_time = time.time() result = po.process_query(query) execution_time = time.time() - start_time # Collect metrics metrics = self.metrics_collector.start_query_collection( test_case.get("test_id", f"eval_{valid_evaluations}"), query ) # Extract result data answer = result.answer if hasattr(result, 'answer') else str(result) confidence = result.confidence if hasattr(result, 'confidence') else 0.5 citations = result.citations if hasattr(result, 'citations') else [] # Collect metrics self.metrics_collector.collect_generation_metrics( metrics, answer, confidence, execution_time, citations ) actual_results = { "confidence": confidence, "answer": answer, "citations": citations } self.metrics_collector.collect_validation_results( metrics, expected, actual_results ) # Calculate score based on target metric if target_metric == "overall_accuracy": score = self._calculate_overall_accuracy(metrics) elif target_metric == "confidence_ece": score = 1.0 - self._calculate_confidence_ece(metrics) # Lower ECE is better elif target_metric == "retrieval_f1": score = self._calculate_retrieval_f1(metrics) else: # Default to validation quality score = metrics.validation_results.get("answer_quality_score", 0.0) total_score += score valid_evaluations += 1 except Exception as e: logger.error(f"Evaluation failed for query '{test_case.get('query', 'unknown')}': {e}") continue # Calculate average score final_score = total_score / valid_evaluations if valid_evaluations > 0 else 0.0 # Clean up temporary config try: config_path.unlink() except: pass return final_score except Exception as e: logger.error(f"Parameter evaluation failed: {e}") return 0.0 return evaluate_parameters def _create_config_with_parameters( self, parameters: Dict[str, Any], base_config: Optional[Path] = None ) -> Path: """Create temporary configuration file with specified parameters.""" # Load base config or use complete default if base_config and base_config.exists(): with open(base_config, 'r') as f: config = yaml.safe_load(f) else: # Use complete default config structure that works config = { "document_processor": { "type": "hybrid_pdf", "config": { "chunk_size": 1024, "chunk_overlap": 128 } }, "embedder": { "type": "modular", "config": { "model": { "type": "sentence_transformer", "config": { "model_name": "sentence-transformers/multi-qa-MiniLM-L6-cos-v1", "device": "mps", "normalize_embeddings": True } }, "batch_processor": { "type": "dynamic", "config": { "initial_batch_size": 32, "max_batch_size": 128, "optimize_for_memory": False } }, "cache": { "type": "memory", "config": { "max_entries": 10000, "max_memory_mb": 512 } } } }, "retriever": { "type": "modular_unified", "config": { "min_semantic_alignment": 0.2, "vector_index": { "type": "faiss", "config": { "index_type": "IndexFlatIP", "normalize_embeddings": True, "metric": "cosine" } }, "sparse": { "type": "bm25", "config": { "k1": 1.2, "b": 0.75, "lowercase": True } }, "fusion": { "type": "rrf", "config": { "k": 60, "weights": { "dense": 0.7, "sparse": 0.3 } } } } }, "answer_generator": { "type": "adaptive_modular", "config": { "prompt_builder": { "type": "simple", "config": {} }, "llm_client": { "type": "ollama", "config": { "model_name": "llama3.2:3b", "base_url": "http://localhost:11434", "timeout": 30 } }, "response_parser": { "type": "markdown", "config": {} }, "confidence_scorer": { "type": "semantic", "config": {} } } } } # Apply parameter updates for param_name, value in parameters.items(): param = self.parameter_registry.get_parameter(param_name) if param: # Convert numpy values to native Python types for YAML serialization if hasattr(value, 'item'): # numpy scalar value = value.item() elif hasattr(value, 'tolist'): # numpy array value = value.tolist() # Update config using parameter path self._update_nested_config(config, param.path, value) else: logger.warning(f"Parameter {param_name} not found in registry") # Write temporary config file temp_file = Path(tempfile.mktemp(suffix='.yaml')) with open(temp_file, 'w') as f: yaml.dump(config, f, indent=2) return temp_file def _update_nested_config(self, config: Dict[str, Any], path: str, value: Any) -> None: """Update nested configuration using dot-separated path.""" parts = path.split('.') current = config # Navigate to parent for part in parts[:-1]: if part not in current: current[part] = {} current = current[part] # Set final value current[parts[-1]] = value def _calculate_overall_accuracy(self, metrics: QueryMetrics) -> float: """Calculate overall accuracy score.""" validation = metrics.validation_results # Weight different validation aspects weights = { "meets_expectations": 0.4, "confidence_in_range": 0.2, "contains_required_terms": 0.2, "has_citations": 0.2 } score = 0.0 for aspect, weight in weights.items(): if validation.get(aspect, False): score += weight return score def _calculate_confidence_ece(self, metrics: QueryMetrics) -> float: """Calculate Expected Calibration Error for confidence.""" # Simplified ECE calculation - would need more data for full ECE confidence = metrics.generation_metrics.get("confidence_score", 0.5) actual_correctness = 1.0 if metrics.validation_results.get("meets_expectations", False) else 0.0 return abs(confidence - actual_correctness) def _calculate_retrieval_f1(self, metrics: QueryMetrics) -> float: """Calculate retrieval F1 score.""" # Simplified F1 based on retrieval metrics docs_retrieved = metrics.retrieval_metrics.get("documents_retrieved", 0) avg_score = metrics.retrieval_metrics.get("avg_semantic_score", 0.0) # Simple heuristic: good retrieval has both sufficient docs and high scores precision = avg_score recall = min(1.0, docs_retrieved / 5.0) # Assume 5 docs is good recall if precision + recall == 0: return 0.0 return 2 * (precision * recall) / (precision + recall) def generate_report(self, output_path: Path) -> None: """Generate comprehensive calibration report.""" try: # Aggregate all results report_data = { "calibration_summary": { "timestamp": datetime.now().isoformat(), "total_optimizations": len(self.optimization_results), "parameters_optimized": len(self.parameter_registry.parameters), "test_queries_used": len(self.test_queries) }, "parameter_registry": { "parameters": { name: { "current": param.current, "component": param.component, "description": param.description, "impacts": param.impacts } for name, param in self.parameter_registry.parameters.items() } }, "optimization_results": { name: { "best_score": result.best_score, "best_parameters": result.best_parameters, "total_evaluations": result.total_evaluations, "optimization_time": result.optimization_time } for name, result in self.optimization_results.items() }, "metrics_summary": self.metrics_collector.calculate_aggregate_metrics(), "recommendations": self._generate_recommendations() } # Write HTML report html_content = self._generate_html_report(report_data) with open(output_path, 'w') as f: f.write(html_content) logger.info(f"Generated calibration report: {output_path}") except Exception as e: logger.error(f"Failed to generate report: {e}") def _generate_recommendations(self) -> List[str]: """Generate actionable recommendations based on calibration results.""" recommendations = [] if not self.optimization_results: recommendations.append("No optimization results available. Run calibration first.") return recommendations # Find best performing configuration best_result = max(self.optimization_results.values(), key=lambda x: x.best_score) recommendations.extend([ f"Best configuration achieved {best_result.best_score:.3f} score", f"Consider deploying parameters: {best_result.best_parameters}", f"Total optimization time: {best_result.optimization_time:.1f}s" ]) # Parameter-specific recommendations best_params = best_result.best_parameters if "bm25_b" in best_params: b_value = best_params["bm25_b"] if b_value < 0.3: recommendations.append(f"BM25 b={b_value} suggests documents have varying lengths - good choice") elif b_value > 0.7: recommendations.append(f"BM25 b={b_value} may over-penalize longer documents") if "rrf_k" in best_params: k_value = best_params["rrf_k"] if k_value < 20: recommendations.append(f"RRF k={k_value} provides high score discrimination") elif k_value > 80: recommendations.append(f"RRF k={k_value} may compress scores too much") return recommendations def _generate_html_report(self, data: Dict[str, Any]) -> str: """Generate HTML calibration report.""" html = f"""
Generated: {data['calibration_summary']['timestamp']}
Total Optimizations: {data['calibration_summary']['total_optimizations']}
Parameters Optimized: {data['calibration_summary']['parameters_optimized']}
Best Score: {result['best_score']:.4f}
Evaluations: {result['total_evaluations']}
Time: {result['optimization_time']:.2f}s
Best Parameters:
Parameter | Current Value | Component | Description |
---|---|---|---|
{name} | {param['current']} | {param['component']} | {param.get('description', 'N/A')} |