enhanced-rag-demo / demo /utils /performance_timing.py
Arthur Passuello
initial commit
5e1a30c
"""
Performance Timing Utilities for Epic 2 Demo
============================================
Provides timing context managers and performance instrumentation for accurate
measurement of component performance in the Epic 2 demo system.
"""
import time
import logging
from contextlib import contextmanager
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from threading import Lock
logger = logging.getLogger(__name__)
@dataclass
class TimingResult:
"""Represents a timing measurement result"""
stage_name: str
start_time: float
end_time: float
duration_ms: float
metadata: Dict[str, Any] = field(default_factory=dict)
@property
def duration_seconds(self) -> float:
return self.duration_ms / 1000.0
@dataclass
class PipelineTimings:
"""Aggregates timing results for a complete pipeline"""
total_start: float
total_end: Optional[float] = None
stages: List[TimingResult] = field(default_factory=list)
@property
def total_duration_ms(self) -> float:
if self.total_end is None:
return 0.0
return (self.total_end - self.total_start) * 1000.0
def get_stage_timings(self) -> Dict[str, Dict[str, Any]]:
"""Get stage timings in format expected by demo UI"""
timings = {}
for stage in self.stages:
timings[stage.stage_name] = {
"time_ms": stage.duration_ms,
"results": stage.metadata.get("results", 0),
"metadata": stage.metadata
}
return timings
def add_stage(self, stage_name: str, duration_ms: float, metadata: Dict[str, Any] = None):
"""Add a completed stage timing"""
current_time = time.time()
stage = TimingResult(
stage_name=stage_name,
start_time=current_time - (duration_ms / 1000.0),
end_time=current_time,
duration_ms=duration_ms,
metadata=metadata or {}
)
self.stages.append(stage)
class PerformanceInstrumentation:
"""Main performance timing instrumentation for Epic 2 demo"""
def __init__(self):
self._active_timings: Dict[str, PipelineTimings] = {}
self._lock = Lock()
def start_pipeline(self, pipeline_id: str) -> PipelineTimings:
"""Start timing a new pipeline"""
with self._lock:
timing = PipelineTimings(total_start=time.time())
self._active_timings[pipeline_id] = timing
return timing
def finish_pipeline(self, pipeline_id: str) -> Optional[PipelineTimings]:
"""Finish timing a pipeline and return results"""
with self._lock:
if pipeline_id in self._active_timings:
timing = self._active_timings[pipeline_id]
timing.total_end = time.time()
del self._active_timings[pipeline_id]
return timing
return None
@contextmanager
def time_stage(self, pipeline_id: str, stage_name: str, metadata: Dict[str, Any] = None):
"""Context manager for timing a pipeline stage"""
start_time = time.time()
try:
yield
finally:
end_time = time.time()
duration_ms = (end_time - start_time) * 1000.0
with self._lock:
if pipeline_id in self._active_timings:
timing = self._active_timings[pipeline_id]
timing.add_stage(stage_name, duration_ms, metadata or {})
logger.debug(f"Stage '{stage_name}' completed in {duration_ms:.2f}ms")
def get_timing(self, pipeline_id: str) -> Optional[PipelineTimings]:
"""Get current timing for a pipeline"""
with self._lock:
return self._active_timings.get(pipeline_id)
class ComponentPerformanceExtractor:
"""Extracts performance metrics from RAG system components"""
@staticmethod
def extract_retriever_metrics(retriever) -> Dict[str, Any]:
"""Extract detailed timing metrics from ModularUnifiedRetriever"""
metrics = {}
# Try to get performance metrics from the retriever
if hasattr(retriever, 'get_metrics'):
component_metrics = retriever.get_metrics()
if component_metrics:
# Extract stats from the actual format
retrieval_stats = component_metrics.get('retrieval_stats', {})
# Get sub-component statistics
sub_components = component_metrics.get('sub_components', {})
# Extract reranker statistics
reranker_stats = sub_components.get('reranker', {}).get('statistics', {})
fusion_stats = sub_components.get('fusion_strategy', {}).get('statistics', {})
# Create metrics in expected format
metrics['dense_retrieval'] = {
'time_ms': retrieval_stats.get('last_retrieval_time', 0) * 1000,
'results': component_metrics.get('indexed_documents', 0)
}
metrics['sparse_retrieval'] = {
'time_ms': retrieval_stats.get('avg_time', 0) * 1000,
'results': component_metrics.get('indexed_documents', 0)
}
metrics['fusion'] = {
'time_ms': fusion_stats.get('avg_graph_latency_ms', 0),
'results': fusion_stats.get('total_fusions', 0)
}
metrics['neural_reranking'] = {
'time_ms': reranker_stats.get('total_latency_ms', 0),
'results': reranker_stats.get('successful_queries', 0)
}
# Total retrieval time
metrics['total_retrieval_time_ms'] = retrieval_stats.get('total_time', 0) * 1000
return metrics
@staticmethod
def extract_generator_metrics(generator) -> Dict[str, Any]:
"""Extract detailed timing metrics from AnswerGenerator"""
metrics = {}
# Try to get performance metrics from the generator
if hasattr(generator, 'get_metrics'):
component_metrics = generator.get_metrics()
if component_metrics:
# Extract stats from the actual format
generation_count = component_metrics.get('generation_count', 0)
total_time = component_metrics.get('total_time', 0)
avg_time = component_metrics.get('avg_time', 0)
# Get sub-component information
sub_components = component_metrics.get('sub_components', {})
llm_client = sub_components.get('llm_client', {})
# Create metrics in expected format
metrics['prompt_building'] = {
'time_ms': avg_time * 1000 * 0.1, # Estimate 10% of total time
'results': generation_count
}
metrics['llm_generation'] = {
'time_ms': avg_time * 1000 * 0.8, # Estimate 80% of total time
'results': generation_count
}
metrics['response_parsing'] = {
'time_ms': avg_time * 1000 * 0.05, # Estimate 5% of total time
'results': generation_count
}
metrics['confidence_scoring'] = {
'time_ms': avg_time * 1000 * 0.05, # Estimate 5% of total time
'results': generation_count
}
# Total generation time
metrics['total_generation_time_ms'] = total_time * 1000
return metrics
@staticmethod
def create_demo_timing_format(retriever_metrics: Dict[str, Any],
generator_metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Create timing format expected by the demo UI"""
return {
# Retrieval stages
"dense_retrieval": retriever_metrics.get('dense_retrieval', {"time_ms": 0, "results": 0}),
"sparse_retrieval": retriever_metrics.get('sparse_retrieval', {"time_ms": 0, "results": 0}),
"graph_enhancement": retriever_metrics.get('fusion', {"time_ms": 0, "results": 0}),
"neural_reranking": retriever_metrics.get('neural_reranking', {"time_ms": 0, "results": 0}),
# Generation stages
"prompt_building": generator_metrics.get('prompt_building', {"time_ms": 0, "results": 0}),
"llm_generation": generator_metrics.get('llm_generation', {"time_ms": 0, "results": 0}),
"response_parsing": generator_metrics.get('response_parsing', {"time_ms": 0, "results": 0}),
"confidence_scoring": generator_metrics.get('confidence_scoring', {"time_ms": 0, "results": 0}),
}
# Global performance instrumentation instance
performance_instrumentation = PerformanceInstrumentation()
@contextmanager
def time_query_pipeline(query: str):
"""Context manager for timing a complete query processing pipeline"""
pipeline_id = f"query_{int(time.time() * 1000)}"
timing = performance_instrumentation.start_pipeline(pipeline_id)
try:
yield timing, pipeline_id
finally:
final_timing = performance_instrumentation.finish_pipeline(pipeline_id)
if final_timing:
logger.info(f"Query pipeline completed in {final_timing.total_duration_ms:.2f}ms")