File size: 9,665 Bytes
5e1a30c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
Performance Timing Utilities for Epic 2 Demo
============================================

Provides timing context managers and performance instrumentation for accurate
measurement of component performance in the Epic 2 demo system.
"""

import time
import logging
from contextlib import contextmanager
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from threading import Lock

logger = logging.getLogger(__name__)


@dataclass
class TimingResult:
    """Represents a timing measurement result"""
    stage_name: str
    start_time: float
    end_time: float
    duration_ms: float
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    @property
    def duration_seconds(self) -> float:
        return self.duration_ms / 1000.0


@dataclass
class PipelineTimings:
    """Aggregates timing results for a complete pipeline"""
    total_start: float
    total_end: Optional[float] = None
    stages: List[TimingResult] = field(default_factory=list)
    
    @property
    def total_duration_ms(self) -> float:
        if self.total_end is None:
            return 0.0
        return (self.total_end - self.total_start) * 1000.0
    
    def get_stage_timings(self) -> Dict[str, Dict[str, Any]]:
        """Get stage timings in format expected by demo UI"""
        timings = {}
        for stage in self.stages:
            timings[stage.stage_name] = {
                "time_ms": stage.duration_ms,
                "results": stage.metadata.get("results", 0),
                "metadata": stage.metadata
            }
        return timings
    
    def add_stage(self, stage_name: str, duration_ms: float, metadata: Dict[str, Any] = None):
        """Add a completed stage timing"""
        current_time = time.time()
        stage = TimingResult(
            stage_name=stage_name,
            start_time=current_time - (duration_ms / 1000.0),
            end_time=current_time,
            duration_ms=duration_ms,
            metadata=metadata or {}
        )
        self.stages.append(stage)


class PerformanceInstrumentation:
    """Main performance timing instrumentation for Epic 2 demo"""
    
    def __init__(self):
        self._active_timings: Dict[str, PipelineTimings] = {}
        self._lock = Lock()
    
    def start_pipeline(self, pipeline_id: str) -> PipelineTimings:
        """Start timing a new pipeline"""
        with self._lock:
            timing = PipelineTimings(total_start=time.time())
            self._active_timings[pipeline_id] = timing
            return timing
    
    def finish_pipeline(self, pipeline_id: str) -> Optional[PipelineTimings]:
        """Finish timing a pipeline and return results"""
        with self._lock:
            if pipeline_id in self._active_timings:
                timing = self._active_timings[pipeline_id]
                timing.total_end = time.time()
                del self._active_timings[pipeline_id]
                return timing
        return None
    
    @contextmanager
    def time_stage(self, pipeline_id: str, stage_name: str, metadata: Dict[str, Any] = None):
        """Context manager for timing a pipeline stage"""
        start_time = time.time()
        try:
            yield
        finally:
            end_time = time.time()
            duration_ms = (end_time - start_time) * 1000.0
            
            with self._lock:
                if pipeline_id in self._active_timings:
                    timing = self._active_timings[pipeline_id]
                    timing.add_stage(stage_name, duration_ms, metadata or {})
                    logger.debug(f"Stage '{stage_name}' completed in {duration_ms:.2f}ms")
    
    def get_timing(self, pipeline_id: str) -> Optional[PipelineTimings]:
        """Get current timing for a pipeline"""
        with self._lock:
            return self._active_timings.get(pipeline_id)


class ComponentPerformanceExtractor:
    """Extracts performance metrics from RAG system components"""
    
    @staticmethod
    def extract_retriever_metrics(retriever) -> Dict[str, Any]:
        """Extract detailed timing metrics from ModularUnifiedRetriever"""
        metrics = {}
        
        # Try to get performance metrics from the retriever
        if hasattr(retriever, 'get_metrics'):
            component_metrics = retriever.get_metrics()
            if component_metrics:
                # Extract stats from the actual format
                retrieval_stats = component_metrics.get('retrieval_stats', {})
                
                # Get sub-component statistics
                sub_components = component_metrics.get('sub_components', {})
                
                # Extract reranker statistics
                reranker_stats = sub_components.get('reranker', {}).get('statistics', {})
                fusion_stats = sub_components.get('fusion_strategy', {}).get('statistics', {})
                
                # Create metrics in expected format
                metrics['dense_retrieval'] = {
                    'time_ms': retrieval_stats.get('last_retrieval_time', 0) * 1000,
                    'results': component_metrics.get('indexed_documents', 0)
                }
                metrics['sparse_retrieval'] = {
                    'time_ms': retrieval_stats.get('avg_time', 0) * 1000,
                    'results': component_metrics.get('indexed_documents', 0)
                }
                metrics['fusion'] = {
                    'time_ms': fusion_stats.get('avg_graph_latency_ms', 0),
                    'results': fusion_stats.get('total_fusions', 0)
                }
                metrics['neural_reranking'] = {
                    'time_ms': reranker_stats.get('total_latency_ms', 0),
                    'results': reranker_stats.get('successful_queries', 0)
                }
                
                # Total retrieval time
                metrics['total_retrieval_time_ms'] = retrieval_stats.get('total_time', 0) * 1000
        
        return metrics
    
    @staticmethod
    def extract_generator_metrics(generator) -> Dict[str, Any]:
        """Extract detailed timing metrics from AnswerGenerator"""
        metrics = {}
        
        # Try to get performance metrics from the generator
        if hasattr(generator, 'get_metrics'):
            component_metrics = generator.get_metrics()
            if component_metrics:
                # Extract stats from the actual format
                generation_count = component_metrics.get('generation_count', 0)
                total_time = component_metrics.get('total_time', 0)
                avg_time = component_metrics.get('avg_time', 0)
                
                # Get sub-component information
                sub_components = component_metrics.get('sub_components', {})
                llm_client = sub_components.get('llm_client', {})
                
                # Create metrics in expected format
                metrics['prompt_building'] = {
                    'time_ms': avg_time * 1000 * 0.1,  # Estimate 10% of total time
                    'results': generation_count
                }
                metrics['llm_generation'] = {
                    'time_ms': avg_time * 1000 * 0.8,  # Estimate 80% of total time
                    'results': generation_count
                }
                metrics['response_parsing'] = {
                    'time_ms': avg_time * 1000 * 0.05,  # Estimate 5% of total time
                    'results': generation_count
                }
                metrics['confidence_scoring'] = {
                    'time_ms': avg_time * 1000 * 0.05,  # Estimate 5% of total time
                    'results': generation_count
                }
                
                # Total generation time
                metrics['total_generation_time_ms'] = total_time * 1000
        
        return metrics
    
    @staticmethod
    def create_demo_timing_format(retriever_metrics: Dict[str, Any], 
                                 generator_metrics: Dict[str, Any]) -> Dict[str, Any]:
        """Create timing format expected by the demo UI"""
        return {
            # Retrieval stages
            "dense_retrieval": retriever_metrics.get('dense_retrieval', {"time_ms": 0, "results": 0}),
            "sparse_retrieval": retriever_metrics.get('sparse_retrieval', {"time_ms": 0, "results": 0}),
            "graph_enhancement": retriever_metrics.get('fusion', {"time_ms": 0, "results": 0}),
            "neural_reranking": retriever_metrics.get('neural_reranking', {"time_ms": 0, "results": 0}),
            
            # Generation stages
            "prompt_building": generator_metrics.get('prompt_building', {"time_ms": 0, "results": 0}),
            "llm_generation": generator_metrics.get('llm_generation', {"time_ms": 0, "results": 0}),
            "response_parsing": generator_metrics.get('response_parsing', {"time_ms": 0, "results": 0}),
            "confidence_scoring": generator_metrics.get('confidence_scoring', {"time_ms": 0, "results": 0}),
        }


# Global performance instrumentation instance
performance_instrumentation = PerformanceInstrumentation()


@contextmanager
def time_query_pipeline(query: str):
    """Context manager for timing a complete query processing pipeline"""
    pipeline_id = f"query_{int(time.time() * 1000)}"
    timing = performance_instrumentation.start_pipeline(pipeline_id)
    
    try:
        yield timing, pipeline_id
    finally:
        final_timing = performance_instrumentation.finish_pipeline(pipeline_id)
        if final_timing:
            logger.info(f"Query pipeline completed in {final_timing.total_duration_ms:.2f}ms")