Spaces:
Running
Running
""" | |
Performance Timing Utilities for Epic 2 Demo | |
============================================ | |
Provides timing context managers and performance instrumentation for accurate | |
measurement of component performance in the Epic 2 demo system. | |
""" | |
import time | |
import logging | |
from contextlib import contextmanager | |
from typing import Dict, Any, Optional, List | |
from dataclasses import dataclass, field | |
from threading import Lock | |
logger = logging.getLogger(__name__) | |
class TimingResult: | |
"""Represents a timing measurement result""" | |
stage_name: str | |
start_time: float | |
end_time: float | |
duration_ms: float | |
metadata: Dict[str, Any] = field(default_factory=dict) | |
def duration_seconds(self) -> float: | |
return self.duration_ms / 1000.0 | |
class PipelineTimings: | |
"""Aggregates timing results for a complete pipeline""" | |
total_start: float | |
total_end: Optional[float] = None | |
stages: List[TimingResult] = field(default_factory=list) | |
def total_duration_ms(self) -> float: | |
if self.total_end is None: | |
return 0.0 | |
return (self.total_end - self.total_start) * 1000.0 | |
def get_stage_timings(self) -> Dict[str, Dict[str, Any]]: | |
"""Get stage timings in format expected by demo UI""" | |
timings = {} | |
for stage in self.stages: | |
timings[stage.stage_name] = { | |
"time_ms": stage.duration_ms, | |
"results": stage.metadata.get("results", 0), | |
"metadata": stage.metadata | |
} | |
return timings | |
def add_stage(self, stage_name: str, duration_ms: float, metadata: Dict[str, Any] = None): | |
"""Add a completed stage timing""" | |
current_time = time.time() | |
stage = TimingResult( | |
stage_name=stage_name, | |
start_time=current_time - (duration_ms / 1000.0), | |
end_time=current_time, | |
duration_ms=duration_ms, | |
metadata=metadata or {} | |
) | |
self.stages.append(stage) | |
class PerformanceInstrumentation: | |
"""Main performance timing instrumentation for Epic 2 demo""" | |
def __init__(self): | |
self._active_timings: Dict[str, PipelineTimings] = {} | |
self._lock = Lock() | |
def start_pipeline(self, pipeline_id: str) -> PipelineTimings: | |
"""Start timing a new pipeline""" | |
with self._lock: | |
timing = PipelineTimings(total_start=time.time()) | |
self._active_timings[pipeline_id] = timing | |
return timing | |
def finish_pipeline(self, pipeline_id: str) -> Optional[PipelineTimings]: | |
"""Finish timing a pipeline and return results""" | |
with self._lock: | |
if pipeline_id in self._active_timings: | |
timing = self._active_timings[pipeline_id] | |
timing.total_end = time.time() | |
del self._active_timings[pipeline_id] | |
return timing | |
return None | |
def time_stage(self, pipeline_id: str, stage_name: str, metadata: Dict[str, Any] = None): | |
"""Context manager for timing a pipeline stage""" | |
start_time = time.time() | |
try: | |
yield | |
finally: | |
end_time = time.time() | |
duration_ms = (end_time - start_time) * 1000.0 | |
with self._lock: | |
if pipeline_id in self._active_timings: | |
timing = self._active_timings[pipeline_id] | |
timing.add_stage(stage_name, duration_ms, metadata or {}) | |
logger.debug(f"Stage '{stage_name}' completed in {duration_ms:.2f}ms") | |
def get_timing(self, pipeline_id: str) -> Optional[PipelineTimings]: | |
"""Get current timing for a pipeline""" | |
with self._lock: | |
return self._active_timings.get(pipeline_id) | |
class ComponentPerformanceExtractor: | |
"""Extracts performance metrics from RAG system components""" | |
def extract_retriever_metrics(retriever) -> Dict[str, Any]: | |
"""Extract detailed timing metrics from ModularUnifiedRetriever""" | |
metrics = {} | |
# Try to get performance metrics from the retriever | |
if hasattr(retriever, 'get_metrics'): | |
component_metrics = retriever.get_metrics() | |
if component_metrics: | |
# Extract stats from the actual format | |
retrieval_stats = component_metrics.get('retrieval_stats', {}) | |
# Get sub-component statistics | |
sub_components = component_metrics.get('sub_components', {}) | |
# Extract reranker statistics | |
reranker_stats = sub_components.get('reranker', {}).get('statistics', {}) | |
fusion_stats = sub_components.get('fusion_strategy', {}).get('statistics', {}) | |
# Create metrics in expected format | |
metrics['dense_retrieval'] = { | |
'time_ms': retrieval_stats.get('last_retrieval_time', 0) * 1000, | |
'results': component_metrics.get('indexed_documents', 0) | |
} | |
metrics['sparse_retrieval'] = { | |
'time_ms': retrieval_stats.get('avg_time', 0) * 1000, | |
'results': component_metrics.get('indexed_documents', 0) | |
} | |
metrics['fusion'] = { | |
'time_ms': fusion_stats.get('avg_graph_latency_ms', 0), | |
'results': fusion_stats.get('total_fusions', 0) | |
} | |
metrics['neural_reranking'] = { | |
'time_ms': reranker_stats.get('total_latency_ms', 0), | |
'results': reranker_stats.get('successful_queries', 0) | |
} | |
# Total retrieval time | |
metrics['total_retrieval_time_ms'] = retrieval_stats.get('total_time', 0) * 1000 | |
return metrics | |
def extract_generator_metrics(generator) -> Dict[str, Any]: | |
"""Extract detailed timing metrics from AnswerGenerator""" | |
metrics = {} | |
# Try to get performance metrics from the generator | |
if hasattr(generator, 'get_metrics'): | |
component_metrics = generator.get_metrics() | |
if component_metrics: | |
# Extract stats from the actual format | |
generation_count = component_metrics.get('generation_count', 0) | |
total_time = component_metrics.get('total_time', 0) | |
avg_time = component_metrics.get('avg_time', 0) | |
# Get sub-component information | |
sub_components = component_metrics.get('sub_components', {}) | |
llm_client = sub_components.get('llm_client', {}) | |
# Create metrics in expected format | |
metrics['prompt_building'] = { | |
'time_ms': avg_time * 1000 * 0.1, # Estimate 10% of total time | |
'results': generation_count | |
} | |
metrics['llm_generation'] = { | |
'time_ms': avg_time * 1000 * 0.8, # Estimate 80% of total time | |
'results': generation_count | |
} | |
metrics['response_parsing'] = { | |
'time_ms': avg_time * 1000 * 0.05, # Estimate 5% of total time | |
'results': generation_count | |
} | |
metrics['confidence_scoring'] = { | |
'time_ms': avg_time * 1000 * 0.05, # Estimate 5% of total time | |
'results': generation_count | |
} | |
# Total generation time | |
metrics['total_generation_time_ms'] = total_time * 1000 | |
return metrics | |
def create_demo_timing_format(retriever_metrics: Dict[str, Any], | |
generator_metrics: Dict[str, Any]) -> Dict[str, Any]: | |
"""Create timing format expected by the demo UI""" | |
return { | |
# Retrieval stages | |
"dense_retrieval": retriever_metrics.get('dense_retrieval', {"time_ms": 0, "results": 0}), | |
"sparse_retrieval": retriever_metrics.get('sparse_retrieval', {"time_ms": 0, "results": 0}), | |
"graph_enhancement": retriever_metrics.get('fusion', {"time_ms": 0, "results": 0}), | |
"neural_reranking": retriever_metrics.get('neural_reranking', {"time_ms": 0, "results": 0}), | |
# Generation stages | |
"prompt_building": generator_metrics.get('prompt_building', {"time_ms": 0, "results": 0}), | |
"llm_generation": generator_metrics.get('llm_generation', {"time_ms": 0, "results": 0}), | |
"response_parsing": generator_metrics.get('response_parsing', {"time_ms": 0, "results": 0}), | |
"confidence_scoring": generator_metrics.get('confidence_scoring', {"time_ms": 0, "results": 0}), | |
} | |
# Global performance instrumentation instance | |
performance_instrumentation = PerformanceInstrumentation() | |
def time_query_pipeline(query: str): | |
"""Context manager for timing a complete query processing pipeline""" | |
pipeline_id = f"query_{int(time.time() * 1000)}" | |
timing = performance_instrumentation.start_pipeline(pipeline_id) | |
try: | |
yield timing, pipeline_id | |
finally: | |
final_timing = performance_instrumentation.finish_pipeline(pipeline_id) | |
if final_timing: | |
logger.info(f"Query pipeline completed in {final_timing.total_duration_ms:.2f}ms") |