Arthur Passuello
initial commit
5e1a30c
"""
Calibration manager for systematic parameter optimization.
Implements the main calibration manager from calibration-system-spec.md that
orchestrates parameter optimization, metrics collection, and system calibration.
"""
import logging
import time
import tempfile
from typing import Dict, Any, List, Optional, Callable
from pathlib import Path
import yaml
from datetime import datetime
from .parameter_registry import ParameterRegistry, Parameter
from .metrics_collector import MetricsCollector, QueryMetrics
from .optimization_engine import OptimizationEngine, OptimizationStrategy, OptimizationResult
from src.core.platform_orchestrator import PlatformOrchestrator
from src.core.interfaces import Document
logger = logging.getLogger(__name__)
class CalibrationManager:
"""
Main calibration system orchestrator following calibration-system-spec.md.
Provides systematic parameter optimization and confidence calibration through:
- Parameter registry management
- Automated metrics collection
- Multi-strategy optimization
- Configuration generation
- Quality validation
"""
def __init__(self, platform_orchestrator: Optional[PlatformOrchestrator] = None):
"""
Initialize calibration manager.
Args:
platform_orchestrator: Optional platform orchestrator instance
"""
self.platform_orchestrator = platform_orchestrator
self.parameter_registry = ParameterRegistry()
self.metrics_collector = MetricsCollector()
self.optimization_results: Dict[str, OptimizationResult] = {}
self.calibration_history: List[Dict[str, Any]] = []
# Golden test set for calibration
self.test_queries: List[Dict[str, Any]] = []
def load_test_set(self, test_set_path: Path) -> None:
"""Load golden test set for calibration."""
try:
with open(test_set_path, 'r') as f:
test_data = yaml.safe_load(f)
if 'test_cases' in test_data:
self.test_queries = test_data['test_cases']
else:
# Assume entire file is list of test cases
self.test_queries = test_data if isinstance(test_data, list) else []
logger.info(f"Loaded {len(self.test_queries)} test queries from {test_set_path}")
except Exception as e:
logger.error(f"Failed to load test set from {test_set_path}: {e}")
# Create basic test set if loading fails
self._create_basic_test_set()
def _create_basic_test_set(self) -> None:
"""Create basic test set for calibration."""
self.test_queries = [
{
"test_id": "TC001",
"category": "factual_simple",
"query": "What is RISC-V?",
"expected_behavior": {
"should_answer": True,
"min_confidence": 0.7,
"max_confidence": 0.95,
"must_contain_terms": ["instruction set", "open"],
"must_not_contain": ["ARM", "x86"],
"min_citations": 1
}
},
{
"test_id": "TC002",
"category": "technical_complex",
"query": "How does RISC-V instruction encoding differ from ARM?",
"expected_behavior": {
"should_answer": True,
"min_confidence": 0.5,
"max_confidence": 0.8,
"must_contain_terms": ["instruction", "encoding", "RISC-V"],
"min_citations": 2
}
},
{
"test_id": "TC003",
"category": "edge_case",
"query": "RISC-V quantum computing applications",
"expected_behavior": {
"should_answer": True,
"max_confidence": 0.6,
"must_contain_terms": ["RISC-V"],
"min_citations": 1
}
}
]
logger.info("Created basic test set with 3 test queries")
def calibrate(
self,
test_set: Optional[Path] = None,
strategy: OptimizationStrategy = OptimizationStrategy.GRID_SEARCH,
target_metric: str = "overall_accuracy",
parameters_to_optimize: Optional[List[str]] = None,
max_evaluations: Optional[int] = None,
base_config: Optional[Path] = None
) -> OptimizationResult:
"""
Run full calibration following calibration-system-spec.md workflow.
Args:
test_set: Path to golden test set (optional, uses basic set if None)
strategy: Optimization strategy to use
target_metric: Metric to optimize
parameters_to_optimize: List of parameter names to optimize (optional)
max_evaluations: Maximum evaluations for optimization
base_config: Base configuration file to use
Returns:
OptimizationResult with best parameters and performance data
"""
logger.info("Starting full calibration process")
# Load test set
if test_set:
self.load_test_set(test_set)
elif not self.test_queries:
self._create_basic_test_set()
# Determine parameters to optimize
if parameters_to_optimize is None:
# Use key parameters identified as most impactful
parameters_to_optimize = [
"bm25_k1", "bm25_b", "rrf_k",
"dense_weight", "sparse_weight",
"score_aware_score_weight", "score_aware_rank_weight"
]
# Generate search space
search_space = self.parameter_registry.get_search_space(parameters_to_optimize)
logger.info(f"Optimizing {len(parameters_to_optimize)} parameters with {len(list(search_space.values())[0]) if search_space else 0} combinations")
# Create evaluation function
evaluation_func = self._create_evaluation_function(base_config, target_metric)
# Run optimization
engine = OptimizationEngine(evaluation_func)
result = engine.optimize(
search_space,
target_metric=target_metric,
strategy=strategy,
max_evaluations=max_evaluations
)
# Store results
self.optimization_results[f"{strategy.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"] = result
# Update parameter registry with best values
for param_name, value in result.best_parameters.items():
self.parameter_registry.update_parameter_current_value(param_name, value)
logger.info(f"Calibration completed. Best score: {result.best_score:.4f}")
logger.info(f"Best parameters: {result.best_parameters}")
return result
def calibrate_component(
self,
component: str,
test_subset: Optional[str] = None,
parameters: Optional[List[str]] = None,
strategy: OptimizationStrategy = OptimizationStrategy.GRID_SEARCH
) -> OptimizationResult:
"""
Run focused calibration on specific component.
Args:
component: Component name to focus calibration on
test_subset: Subset of test queries to use (optional)
parameters: Specific parameters to optimize (optional)
strategy: Optimization strategy
Returns:
OptimizationResult for component-specific optimization
"""
logger.info(f"Starting component calibration for: {component}")
# Get component parameters
component_params = self.parameter_registry.get_parameters_for_component(component)
if parameters:
# Filter to specified parameters
param_names = [p.name for p in component_params if p.name in parameters]
else:
param_names = [p.name for p in component_params]
if not param_names:
logger.warning(f"No parameters found for component {component}")
return None
# Filter test queries if subset specified
test_queries = self.test_queries
if test_subset:
test_queries = [q for q in self.test_queries if q.get("category") == test_subset]
logger.info(f"Using {len(test_queries)} test queries for component calibration")
# Run targeted calibration
search_space = self.parameter_registry.get_search_space(param_names)
evaluation_func = self._create_evaluation_function(target_queries=test_queries)
engine = OptimizationEngine(evaluation_func)
result = engine.optimize(search_space, strategy=strategy)
# Store component-specific result
self.optimization_results[f"{component}_{strategy.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"] = result
return result
def _create_evaluation_function(
self,
base_config: Optional[Path] = None,
target_metric: str = "overall_accuracy",
target_queries: Optional[List[Dict[str, Any]]] = None
) -> Callable[[Dict[str, Any]], float]:
"""Create evaluation function for parameter optimization."""
def evaluate_parameters(parameters: Dict[str, Any]) -> float:
"""Evaluate parameter configuration and return quality score."""
try:
# Create temporary config with new parameters
config_path = self._create_config_with_parameters(parameters, base_config)
# Initialize platform orchestrator with new config
po = PlatformOrchestrator(str(config_path))
# Index test documents if they exist
if hasattr(self, 'test_documents') and self.test_documents:
po.index_documents(self.test_documents)
else:
# Create minimal test documents for evaluation
from src.core.interfaces import Document
test_docs = [
Document(
content="RISC-V is an open-source instruction set architecture (ISA) based on reduced instruction set computing (RISC) principles. It provides a complete instruction set specification designed to support computer research and education.",
metadata={"source": "risc_v_intro.pdf", "page": 1, "title": "RISC-V Introduction"}
),
Document(
content="The RISC-V vector extension (RVV) provides vector processing capabilities for parallel computation. Vector instructions operate on vectors of data elements, enabling efficient SIMD operations for scientific and machine learning applications.",
metadata={"source": "risc_v_vector.pdf", "page": 5, "title": "RISC-V Vector Extension"}
)
]
po.index_documents(test_docs)
# Use provided queries or default test set
queries_to_test = target_queries if target_queries else self.test_queries
# Run evaluation on test queries
total_score = 0.0
valid_evaluations = 0
for test_case in queries_to_test:
try:
# Execute query
query = test_case["query"]
expected = test_case.get("expected_behavior", {})
start_time = time.time()
result = po.process_query(query)
execution_time = time.time() - start_time
# Collect metrics
metrics = self.metrics_collector.start_query_collection(
test_case.get("test_id", f"eval_{valid_evaluations}"),
query
)
# Extract result data
answer = result.answer if hasattr(result, 'answer') else str(result)
confidence = result.confidence if hasattr(result, 'confidence') else 0.5
citations = result.citations if hasattr(result, 'citations') else []
# Collect metrics
self.metrics_collector.collect_generation_metrics(
metrics, answer, confidence, execution_time, citations
)
actual_results = {
"confidence": confidence,
"answer": answer,
"citations": citations
}
self.metrics_collector.collect_validation_results(
metrics, expected, actual_results
)
# Calculate score based on target metric
if target_metric == "overall_accuracy":
score = self._calculate_overall_accuracy(metrics)
elif target_metric == "confidence_ece":
score = 1.0 - self._calculate_confidence_ece(metrics) # Lower ECE is better
elif target_metric == "retrieval_f1":
score = self._calculate_retrieval_f1(metrics)
else:
# Default to validation quality
score = metrics.validation_results.get("answer_quality_score", 0.0)
total_score += score
valid_evaluations += 1
except Exception as e:
logger.error(f"Evaluation failed for query '{test_case.get('query', 'unknown')}': {e}")
continue
# Calculate average score
final_score = total_score / valid_evaluations if valid_evaluations > 0 else 0.0
# Clean up temporary config
try:
config_path.unlink()
except:
pass
return final_score
except Exception as e:
logger.error(f"Parameter evaluation failed: {e}")
return 0.0
return evaluate_parameters
def _create_config_with_parameters(
self,
parameters: Dict[str, Any],
base_config: Optional[Path] = None
) -> Path:
"""Create temporary configuration file with specified parameters."""
# Load base config or use complete default
if base_config and base_config.exists():
with open(base_config, 'r') as f:
config = yaml.safe_load(f)
else:
# Use complete default config structure that works
config = {
"document_processor": {
"type": "hybrid_pdf",
"config": {
"chunk_size": 1024,
"chunk_overlap": 128
}
},
"embedder": {
"type": "modular",
"config": {
"model": {
"type": "sentence_transformer",
"config": {
"model_name": "sentence-transformers/multi-qa-MiniLM-L6-cos-v1",
"device": "mps",
"normalize_embeddings": True
}
},
"batch_processor": {
"type": "dynamic",
"config": {
"initial_batch_size": 32,
"max_batch_size": 128,
"optimize_for_memory": False
}
},
"cache": {
"type": "memory",
"config": {
"max_entries": 10000,
"max_memory_mb": 512
}
}
}
},
"retriever": {
"type": "modular_unified",
"config": {
"min_semantic_alignment": 0.2,
"vector_index": {
"type": "faiss",
"config": {
"index_type": "IndexFlatIP",
"normalize_embeddings": True,
"metric": "cosine"
}
},
"sparse": {
"type": "bm25",
"config": {
"k1": 1.2,
"b": 0.75,
"lowercase": True
}
},
"fusion": {
"type": "rrf",
"config": {
"k": 60,
"weights": {
"dense": 0.7,
"sparse": 0.3
}
}
}
}
},
"answer_generator": {
"type": "adaptive_modular",
"config": {
"prompt_builder": {
"type": "simple",
"config": {}
},
"llm_client": {
"type": "ollama",
"config": {
"model_name": "llama3.2:3b",
"base_url": "http://localhost:11434",
"timeout": 30
}
},
"response_parser": {
"type": "markdown",
"config": {}
},
"confidence_scorer": {
"type": "semantic",
"config": {}
}
}
}
}
# Apply parameter updates
for param_name, value in parameters.items():
param = self.parameter_registry.get_parameter(param_name)
if param:
# Convert numpy values to native Python types for YAML serialization
if hasattr(value, 'item'): # numpy scalar
value = value.item()
elif hasattr(value, 'tolist'): # numpy array
value = value.tolist()
# Update config using parameter path
self._update_nested_config(config, param.path, value)
else:
logger.warning(f"Parameter {param_name} not found in registry")
# Write temporary config file
temp_file = Path(tempfile.mktemp(suffix='.yaml'))
with open(temp_file, 'w') as f:
yaml.dump(config, f, indent=2)
return temp_file
def _update_nested_config(self, config: Dict[str, Any], path: str, value: Any) -> None:
"""Update nested configuration using dot-separated path."""
parts = path.split('.')
current = config
# Navigate to parent
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
# Set final value
current[parts[-1]] = value
def _calculate_overall_accuracy(self, metrics: QueryMetrics) -> float:
"""Calculate overall accuracy score."""
validation = metrics.validation_results
# Weight different validation aspects
weights = {
"meets_expectations": 0.4,
"confidence_in_range": 0.2,
"contains_required_terms": 0.2,
"has_citations": 0.2
}
score = 0.0
for aspect, weight in weights.items():
if validation.get(aspect, False):
score += weight
return score
def _calculate_confidence_ece(self, metrics: QueryMetrics) -> float:
"""Calculate Expected Calibration Error for confidence."""
# Simplified ECE calculation - would need more data for full ECE
confidence = metrics.generation_metrics.get("confidence_score", 0.5)
actual_correctness = 1.0 if metrics.validation_results.get("meets_expectations", False) else 0.0
return abs(confidence - actual_correctness)
def _calculate_retrieval_f1(self, metrics: QueryMetrics) -> float:
"""Calculate retrieval F1 score."""
# Simplified F1 based on retrieval metrics
docs_retrieved = metrics.retrieval_metrics.get("documents_retrieved", 0)
avg_score = metrics.retrieval_metrics.get("avg_semantic_score", 0.0)
# Simple heuristic: good retrieval has both sufficient docs and high scores
precision = avg_score
recall = min(1.0, docs_retrieved / 5.0) # Assume 5 docs is good recall
if precision + recall == 0:
return 0.0
return 2 * (precision * recall) / (precision + recall)
def generate_report(self, output_path: Path) -> None:
"""Generate comprehensive calibration report."""
try:
# Aggregate all results
report_data = {
"calibration_summary": {
"timestamp": datetime.now().isoformat(),
"total_optimizations": len(self.optimization_results),
"parameters_optimized": len(self.parameter_registry.parameters),
"test_queries_used": len(self.test_queries)
},
"parameter_registry": {
"parameters": {
name: {
"current": param.current,
"component": param.component,
"description": param.description,
"impacts": param.impacts
}
for name, param in self.parameter_registry.parameters.items()
}
},
"optimization_results": {
name: {
"best_score": result.best_score,
"best_parameters": result.best_parameters,
"total_evaluations": result.total_evaluations,
"optimization_time": result.optimization_time
}
for name, result in self.optimization_results.items()
},
"metrics_summary": self.metrics_collector.calculate_aggregate_metrics(),
"recommendations": self._generate_recommendations()
}
# Write HTML report
html_content = self._generate_html_report(report_data)
with open(output_path, 'w') as f:
f.write(html_content)
logger.info(f"Generated calibration report: {output_path}")
except Exception as e:
logger.error(f"Failed to generate report: {e}")
def _generate_recommendations(self) -> List[str]:
"""Generate actionable recommendations based on calibration results."""
recommendations = []
if not self.optimization_results:
recommendations.append("No optimization results available. Run calibration first.")
return recommendations
# Find best performing configuration
best_result = max(self.optimization_results.values(), key=lambda x: x.best_score)
recommendations.extend([
f"Best configuration achieved {best_result.best_score:.3f} score",
f"Consider deploying parameters: {best_result.best_parameters}",
f"Total optimization time: {best_result.optimization_time:.1f}s"
])
# Parameter-specific recommendations
best_params = best_result.best_parameters
if "bm25_b" in best_params:
b_value = best_params["bm25_b"]
if b_value < 0.3:
recommendations.append(f"BM25 b={b_value} suggests documents have varying lengths - good choice")
elif b_value > 0.7:
recommendations.append(f"BM25 b={b_value} may over-penalize longer documents")
if "rrf_k" in best_params:
k_value = best_params["rrf_k"]
if k_value < 20:
recommendations.append(f"RRF k={k_value} provides high score discrimination")
elif k_value > 80:
recommendations.append(f"RRF k={k_value} may compress scores too much")
return recommendations
def _generate_html_report(self, data: Dict[str, Any]) -> str:
"""Generate HTML calibration report."""
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>RAG System Calibration Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
.header {{ background-color: #f0f0f0; padding: 20px; border-radius: 5px; }}
.section {{ margin: 30px 0; }}
.metric {{ background-color: #f9f9f9; padding: 15px; margin: 10px 0; border-radius: 3px; }}
.param-table {{ border-collapse: collapse; width: 100%; }}
.param-table th, .param-table td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
.param-table th {{ background-color: #f2f2f2; }}
.recommendation {{ background-color: #e8f4fd; padding: 10px; margin: 5px 0; border-radius: 3px; }}
</style>
</head>
<body>
<div class="header">
<h1>RAG System Calibration Report</h1>
<p>Generated: {data['calibration_summary']['timestamp']}</p>
<p>Total Optimizations: {data['calibration_summary']['total_optimizations']}</p>
<p>Parameters Optimized: {data['calibration_summary']['parameters_optimized']}</p>
</div>
<div class="section">
<h2>Optimization Results</h2>
"""
for name, result in data['optimization_results'].items():
html += f"""
<div class="metric">
<h3>{name}</h3>
<p><strong>Best Score:</strong> {result['best_score']:.4f}</p>
<p><strong>Evaluations:</strong> {result['total_evaluations']}</p>
<p><strong>Time:</strong> {result['optimization_time']:.2f}s</p>
<p><strong>Best Parameters:</strong></p>
<ul>
"""
for param, value in result['best_parameters'].items():
html += f"<li>{param}: {value}</li>"
html += "</ul></div>"
html += """
</div>
<div class="section">
<h2>Current Parameter Configuration</h2>
<table class="param-table">
<tr>
<th>Parameter</th>
<th>Current Value</th>
<th>Component</th>
<th>Description</th>
</tr>
"""
for name, param in data['parameter_registry']['parameters'].items():
html += f"""
<tr>
<td>{name}</td>
<td>{param['current']}</td>
<td>{param['component']}</td>
<td>{param.get('description', 'N/A')}</td>
</tr>
"""
html += """
</table>
</div>
<div class="section">
<h2>Recommendations</h2>
"""
for rec in data['recommendations']:
html += f'<div class="recommendation">{rec}</div>'
html += """
</div>
</body>
</html>
"""
return html
def save_optimal_configuration(self, output_path: Path, optimization_name: Optional[str] = None) -> None:
"""Save optimal configuration based on calibration results."""
if not self.optimization_results:
logger.error("No optimization results available")
return
# Get best result
if optimization_name and optimization_name in self.optimization_results:
result = self.optimization_results[optimization_name]
else:
result = max(self.optimization_results.values(), key=lambda x: x.best_score)
# Generate optimal config
optimal_config = self._create_config_with_parameters(result.best_parameters)
# Copy to output path with metadata
with open(optimal_config, 'r') as f:
config_content = f.read()
header = f"""# Optimal RAG Configuration
# Generated by Calibration System
# Timestamp: {datetime.now().isoformat()}
# Best Score: {result.best_score:.4f}
# Total Evaluations: {result.total_evaluations}
# Optimization Time: {result.optimization_time:.2f}s
"""
with open(output_path, 'w') as f:
f.write(header + config_content)
# Clean up temp file
optimal_config.unlink()
logger.info(f"Saved optimal configuration to {output_path}")
if __name__ == "__main__":
# Test calibration manager
manager = CalibrationManager()
print("Calibration Manager Test")
print("=" * 40)
# Show parameter registry
print(manager.parameter_registry.get_parameter_summary())
# Test basic functionality
print(f"\nTest queries loaded: {len(manager.test_queries)}")
if manager.test_queries:
print("Sample query:", manager.test_queries[0]["query"])