Spaces:
Running
Running
""" | |
Real-time Metrics Collection for Advanced Retriever. | |
This module collects and aggregates real-time metrics from the advanced | |
retriever system for analytics dashboard and performance monitoring. | |
""" | |
import time | |
import logging | |
from typing import Dict, Any, List, Optional | |
from collections import defaultdict, deque | |
from dataclasses import dataclass, asdict | |
import threading | |
import json | |
from pathlib import Path | |
logger = logging.getLogger(__name__) | |
class QueryMetrics: | |
"""Metrics for a single query.""" | |
query_id: str | |
query_text: str | |
timestamp: float | |
# Latency metrics (milliseconds) | |
total_latency: float | |
dense_retrieval_latency: float | |
sparse_retrieval_latency: float | |
graph_retrieval_latency: float | |
neural_reranking_latency: float | |
# Quality metrics | |
num_results: int | |
relevance_scores: List[float] | |
confidence_score: float | |
# Component usage | |
components_used: List[str] | |
backend_used: str | |
# Stage timing information (Epic 2 Week 4 enhancement) | |
stage_times: Optional[Dict[str, float]] = None | |
stage_count: Optional[int] = None | |
# User interaction | |
user_id: Optional[str] = None | |
session_id: Optional[str] = None | |
class SystemMetrics: | |
"""System-wide performance metrics.""" | |
timestamp: float | |
# Performance | |
queries_per_second: float | |
avg_latency_ms: float | |
p95_latency_ms: float | |
p99_latency_ms: float | |
# Quality | |
avg_relevance_score: float | |
avg_confidence_score: float | |
success_rate: float | |
# Resource usage | |
memory_usage_mb: float | |
cpu_usage_percent: float | |
# Component status | |
active_components: List[str] | |
component_health: Dict[str, str] | |
class MetricsCollector: | |
""" | |
Real-time metrics collector for advanced retriever analytics. | |
This class collects, aggregates, and provides access to real-time | |
metrics from the advanced retriever system for dashboard visualization | |
and performance monitoring. | |
""" | |
def __init__(self, | |
max_query_history: int = 10000, | |
aggregation_window_seconds: int = 60, | |
storage_path: Optional[Path] = None): | |
""" | |
Initialize metrics collector. | |
Args: | |
max_query_history: Maximum number of query metrics to keep | |
aggregation_window_seconds: Window for aggregating metrics | |
storage_path: Optional path to persist metrics | |
""" | |
self.max_query_history = max_query_history | |
self.aggregation_window_seconds = aggregation_window_seconds | |
self.storage_path = storage_path | |
# Query-level metrics storage | |
self.query_metrics: deque = deque(maxlen=max_query_history) | |
self.query_metrics_lock = threading.RLock() | |
# System-level metrics | |
self.system_metrics: deque = deque(maxlen=1440) # 24 hours at 1-minute intervals | |
self.system_metrics_lock = threading.RLock() | |
# Real-time aggregation | |
self.current_window_start = time.time() | |
self.current_window_queries = [] | |
# Component tracking | |
self.component_metrics = defaultdict(lambda: { | |
"total_calls": 0, | |
"total_latency": 0.0, | |
"error_count": 0, | |
"last_used": 0.0 | |
}) | |
# Backend performance tracking | |
self.backend_metrics = defaultdict(lambda: { | |
"total_queries": 0, | |
"total_latency": 0.0, | |
"success_count": 0, | |
"error_count": 0 | |
}) | |
# Dashboard data cache | |
self.dashboard_cache = {} | |
self.cache_timestamp = 0.0 | |
self.cache_ttl_seconds = 5.0 # 5-second cache | |
logger.info(f"MetricsCollector initialized with {max_query_history} query history") | |
def record_query_metrics(self, metrics: QueryMetrics) -> None: | |
""" | |
Record metrics for a completed query. | |
Args: | |
metrics: Query metrics to record | |
""" | |
with self.query_metrics_lock: | |
self.query_metrics.append(metrics) | |
self.current_window_queries.append(metrics) | |
# Update component metrics | |
self._update_component_metrics(metrics) | |
# Update backend metrics | |
self._update_backend_metrics(metrics) | |
# Check if we should aggregate system metrics | |
if time.time() - self.current_window_start >= self.aggregation_window_seconds: | |
self._aggregate_system_metrics() | |
# Invalidate dashboard cache | |
self.cache_timestamp = 0.0 | |
logger.debug(f"Recorded query metrics: {metrics.query_id}") | |
def record_component_usage(self, | |
component_name: str, | |
latency_ms: float, | |
success: bool = True) -> None: | |
""" | |
Record component usage metrics. | |
Args: | |
component_name: Name of the component | |
latency_ms: Processing latency in milliseconds | |
success: Whether the operation was successful | |
""" | |
metrics = self.component_metrics[component_name] | |
metrics["total_calls"] += 1 | |
metrics["total_latency"] += latency_ms | |
metrics["last_used"] = time.time() | |
if not success: | |
metrics["error_count"] += 1 | |
def get_real_time_dashboard_data(self) -> Dict[str, Any]: | |
""" | |
Get real-time data for dashboard visualization. | |
Returns: | |
Dictionary with dashboard data | |
""" | |
# Check cache first | |
current_time = time.time() | |
if (current_time - self.cache_timestamp) < self.cache_ttl_seconds and self.dashboard_cache: | |
return self.dashboard_cache | |
# Generate fresh dashboard data | |
dashboard_data = { | |
"timestamp": current_time, | |
"overview": self._get_overview_metrics(), | |
"performance": self._get_performance_metrics(), | |
"quality": self._get_quality_metrics(), | |
"components": self._get_component_metrics(), | |
"backends": self._get_backend_metrics(), | |
"recent_queries": self._get_recent_queries(), | |
"time_series": self._get_time_series_data() | |
} | |
# Update cache | |
self.dashboard_cache = dashboard_data | |
self.cache_timestamp = current_time | |
return dashboard_data | |
def _get_overview_metrics(self) -> Dict[str, Any]: | |
"""Get high-level overview metrics.""" | |
with self.query_metrics_lock: | |
recent_queries = list(self.query_metrics)[-100:] # Last 100 queries | |
if not recent_queries: | |
return { | |
"total_queries": 0, | |
"queries_per_minute": 0.0, | |
"avg_latency_ms": 0.0, | |
"success_rate": 0.0 | |
} | |
# Calculate queries per minute | |
if len(recent_queries) >= 2: | |
time_span = recent_queries[-1].timestamp - recent_queries[0].timestamp | |
queries_per_minute = (len(recent_queries) / max(time_span, 1)) * 60 | |
else: | |
queries_per_minute = 0.0 | |
# Calculate average latency | |
avg_latency = sum(q.total_latency for q in recent_queries) / len(recent_queries) | |
# Calculate success rate (assume success if confidence > 0.5) | |
successful_queries = sum(1 for q in recent_queries if q.confidence_score > 0.5) | |
success_rate = successful_queries / len(recent_queries) * 100 | |
return { | |
"total_queries": len(self.query_metrics), | |
"queries_per_minute": queries_per_minute, | |
"avg_latency_ms": avg_latency, | |
"success_rate": success_rate | |
} | |
def _get_performance_metrics(self) -> Dict[str, Any]: | |
"""Get detailed performance metrics.""" | |
with self.query_metrics_lock: | |
recent_queries = list(self.query_metrics)[-1000:] # Last 1000 queries | |
if not recent_queries: | |
return {} | |
# Calculate latency percentiles | |
latencies = [q.total_latency for q in recent_queries] | |
latencies.sort() | |
n = len(latencies) | |
p50 = latencies[int(n * 0.5)] if n > 0 else 0.0 | |
p95 = latencies[int(n * 0.95)] if n > 0 else 0.0 | |
p99 = latencies[int(n * 0.99)] if n > 0 else 0.0 | |
# Component latency breakdown | |
component_latencies = { | |
"dense_retrieval": sum(q.dense_retrieval_latency for q in recent_queries) / len(recent_queries), | |
"sparse_retrieval": sum(q.sparse_retrieval_latency for q in recent_queries) / len(recent_queries), | |
"graph_retrieval": sum(q.graph_retrieval_latency for q in recent_queries) / len(recent_queries), | |
"neural_reranking": sum(q.neural_reranking_latency for q in recent_queries) / len(recent_queries) | |
} | |
return { | |
"latency_percentiles": { | |
"p50": p50, | |
"p95": p95, | |
"p99": p99 | |
}, | |
"component_latencies": component_latencies, | |
"throughput": { | |
"current_qps": len(recent_queries) / 60.0 if recent_queries else 0.0, | |
"peak_qps": self._calculate_peak_qps(recent_queries) | |
} | |
} | |
def _get_quality_metrics(self) -> Dict[str, Any]: | |
"""Get quality-related metrics.""" | |
with self.query_metrics_lock: | |
recent_queries = list(self.query_metrics)[-1000:] | |
if not recent_queries: | |
return {} | |
# Average relevance scores | |
avg_relevance = 0.0 | |
total_results = 0 | |
for query in recent_queries: | |
if query.relevance_scores: | |
avg_relevance += sum(query.relevance_scores) | |
total_results += len(query.relevance_scores) | |
if total_results > 0: | |
avg_relevance /= total_results | |
# Confidence distribution | |
confidences = [q.confidence_score for q in recent_queries] | |
high_confidence = sum(1 for c in confidences if c > 0.8) | |
medium_confidence = sum(1 for c in confidences if 0.5 <= c <= 0.8) | |
low_confidence = sum(1 for c in confidences if c < 0.5) | |
return { | |
"avg_relevance_score": avg_relevance, | |
"avg_confidence_score": sum(confidences) / len(confidences) if confidences else 0.0, | |
"confidence_distribution": { | |
"high": high_confidence, | |
"medium": medium_confidence, | |
"low": low_confidence | |
}, | |
"avg_results_per_query": sum(q.num_results for q in recent_queries) / len(recent_queries) | |
} | |
def _get_component_metrics(self) -> Dict[str, Any]: | |
"""Get component usage and performance metrics.""" | |
components_data = {} | |
for component_name, metrics in self.component_metrics.items(): | |
if metrics["total_calls"] > 0: | |
avg_latency = metrics["total_latency"] / metrics["total_calls"] | |
error_rate = metrics["error_count"] / metrics["total_calls"] * 100 | |
components_data[component_name] = { | |
"total_calls": metrics["total_calls"], | |
"avg_latency_ms": avg_latency, | |
"error_rate": error_rate, | |
"last_used": metrics["last_used"], | |
"status": "healthy" if error_rate < 5 else "warning" if error_rate < 15 else "error" | |
} | |
return components_data | |
def _get_backend_metrics(self) -> Dict[str, Any]: | |
"""Get backend performance metrics.""" | |
backends_data = {} | |
for backend_name, metrics in self.backend_metrics.items(): | |
if metrics["total_queries"] > 0: | |
avg_latency = metrics["total_latency"] / metrics["total_queries"] | |
success_rate = metrics["success_count"] / metrics["total_queries"] * 100 | |
backends_data[backend_name] = { | |
"total_queries": metrics["total_queries"], | |
"avg_latency_ms": avg_latency, | |
"success_rate": success_rate, | |
"error_count": metrics["error_count"] | |
} | |
return backends_data | |
def _get_recent_queries(self, limit: int = 50) -> List[Dict[str, Any]]: | |
"""Get recent query details.""" | |
with self.query_metrics_lock: | |
recent_queries = list(self.query_metrics)[-limit:] | |
query_data = [] | |
for query in recent_queries: | |
query_data.append({ | |
"query_id": query.query_id, | |
"query_text": query.query_text[:100] + "..." if len(query.query_text) > 100 else query.query_text, | |
"timestamp": query.timestamp, | |
"total_latency": query.total_latency, | |
"confidence_score": query.confidence_score, | |
"num_results": query.num_results, | |
"backend_used": query.backend_used, | |
"components_used": query.components_used | |
}) | |
return query_data | |
def _get_time_series_data(self) -> Dict[str, Any]: | |
"""Get time series data for charts.""" | |
with self.system_metrics_lock: | |
system_data = list(self.system_metrics)[-60:] # Last 60 minutes | |
if not system_data: | |
return {} | |
timestamps = [s.timestamp for s in system_data] | |
latencies = [s.avg_latency_ms for s in system_data] | |
qps = [s.queries_per_second for s in system_data] | |
success_rates = [s.success_rate for s in system_data] | |
return { | |
"timestamps": timestamps, | |
"latency": latencies, | |
"qps": qps, | |
"success_rate": success_rates | |
} | |
def _update_component_metrics(self, query_metrics: QueryMetrics) -> None: | |
"""Update component-level metrics from query.""" | |
for component in query_metrics.components_used: | |
self.record_component_usage(component, query_metrics.total_latency / len(query_metrics.components_used)) | |
def _update_backend_metrics(self, query_metrics: QueryMetrics) -> None: | |
"""Update backend-level metrics from query.""" | |
backend = query_metrics.backend_used | |
metrics = self.backend_metrics[backend] | |
metrics["total_queries"] += 1 | |
metrics["total_latency"] += query_metrics.total_latency | |
if query_metrics.confidence_score > 0.5: | |
metrics["success_count"] += 1 | |
else: | |
metrics["error_count"] += 1 | |
def _aggregate_system_metrics(self) -> None: | |
"""Aggregate current window into system metrics.""" | |
if not self.current_window_queries: | |
return | |
current_time = time.time() | |
window_duration = current_time - self.current_window_start | |
# Calculate aggregated metrics | |
total_queries = len(self.current_window_queries) | |
qps = total_queries / window_duration if window_duration > 0 else 0.0 | |
latencies = [q.total_latency for q in self.current_window_queries] | |
avg_latency = sum(latencies) / len(latencies) | |
latencies.sort() | |
n = len(latencies) | |
p95_latency = latencies[int(n * 0.95)] if n > 0 else 0.0 | |
p99_latency = latencies[int(n * 0.99)] if n > 0 else 0.0 | |
# Quality metrics | |
confidences = [q.confidence_score for q in self.current_window_queries] | |
avg_confidence = sum(confidences) / len(confidences) | |
relevance_scores = [] | |
for query in self.current_window_queries: | |
relevance_scores.extend(query.relevance_scores) | |
avg_relevance = sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0.0 | |
success_count = sum(1 for q in self.current_window_queries if q.confidence_score > 0.5) | |
success_rate = success_count / total_queries * 100 | |
# Create system metrics | |
system_metrics = SystemMetrics( | |
timestamp=current_time, | |
queries_per_second=qps, | |
avg_latency_ms=avg_latency, | |
p95_latency_ms=p95_latency, | |
p99_latency_ms=p99_latency, | |
avg_relevance_score=avg_relevance, | |
avg_confidence_score=avg_confidence, | |
success_rate=success_rate, | |
memory_usage_mb=0.0, # Placeholder | |
cpu_usage_percent=0.0, # Placeholder | |
active_components=list(set(comp for q in self.current_window_queries for comp in q.components_used)), | |
component_health={} # Placeholder | |
) | |
# Store system metrics | |
with self.system_metrics_lock: | |
self.system_metrics.append(system_metrics) | |
# Reset window | |
self.current_window_start = current_time | |
self.current_window_queries = [] | |
logger.debug(f"Aggregated system metrics: QPS={qps:.2f}, avg_latency={avg_latency:.1f}ms") | |
def _calculate_peak_qps(self, queries: List[QueryMetrics], window_seconds: int = 60) -> float: | |
"""Calculate peak queries per second in a sliding window.""" | |
if len(queries) < 2: | |
return 0.0 | |
max_qps = 0.0 | |
for i in range(len(queries)): | |
window_start = queries[i].timestamp | |
window_end = window_start + window_seconds | |
# Count queries in this window | |
count = 0 | |
for j in range(i, len(queries)): | |
if queries[j].timestamp <= window_end: | |
count += 1 | |
else: | |
break | |
qps = count / window_seconds | |
max_qps = max(max_qps, qps) | |
return max_qps | |
def export_metrics(self, filepath: Path) -> None: | |
""" | |
Export metrics to file. | |
Args: | |
filepath: Path to export file | |
""" | |
try: | |
export_data = { | |
"export_timestamp": time.time(), | |
"query_metrics": [asdict(q) for q in self.query_metrics], | |
"system_metrics": [asdict(s) for s in self.system_metrics], | |
"component_metrics": dict(self.component_metrics), | |
"backend_metrics": dict(self.backend_metrics) | |
} | |
with open(filepath, 'w') as f: | |
json.dump(export_data, f, indent=2) | |
logger.info(f"Metrics exported to {filepath}") | |
except Exception as e: | |
logger.error(f"Failed to export metrics: {e}") | |
def get_summary_stats(self) -> Dict[str, Any]: | |
""" | |
Get summary statistics. | |
Returns: | |
Dictionary with summary statistics | |
""" | |
with self.query_metrics_lock: | |
total_queries = len(self.query_metrics) | |
with self.system_metrics_lock: | |
total_system_metrics = len(self.system_metrics) | |
return { | |
"total_queries_recorded": total_queries, | |
"total_system_metrics": total_system_metrics, | |
"active_components": len(self.component_metrics), | |
"active_backends": len(self.backend_metrics), | |
"collection_window_seconds": self.aggregation_window_seconds, | |
"max_query_history": self.max_query_history, | |
"cache_ttl_seconds": self.cache_ttl_seconds | |
} |