Spaces:
Running
Running
""" | |
Analytics and usage tracking system for the RAG application. | |
""" | |
import time | |
import json | |
from pathlib import Path | |
from typing import Any, Dict, List, Optional, Union | |
from collections import defaultdict, deque | |
from datetime import datetime, timedelta | |
import threading | |
from dataclasses import dataclass, asdict | |
class QueryEvent: | |
"""Represents a query event for analytics.""" | |
timestamp: float | |
query: str | |
query_length: int | |
search_mode: str | |
results_count: int | |
search_time: float | |
user_session: str = None | |
metadata_filters: Dict[str, Any] = None | |
def to_dict(self) -> Dict[str, Any]: | |
return asdict(self) | |
class DocumentEvent: | |
"""Represents a document processing event.""" | |
timestamp: float | |
filename: str | |
file_size: int | |
file_type: str | |
processing_time: float | |
chunk_count: int | |
success: bool | |
error_message: str = None | |
user_session: str = None | |
def to_dict(self) -> Dict[str, Any]: | |
return asdict(self) | |
class AnalyticsManager: | |
"""Manages analytics and usage tracking for the RAG system.""" | |
def __init__(self, config: Dict[str, Any]): | |
self.config = config | |
self.analytics_enabled = config.get("ui", {}).get("show_analytics", True) | |
self.max_events = 10000 # Maximum events to keep in memory | |
self.save_interval = 300 # Save to disk every 5 minutes | |
# Event storage | |
self.query_events: deque = deque(maxlen=self.max_events) | |
self.document_events: deque = deque(maxlen=self.max_events) | |
self.system_events: deque = deque(maxlen=self.max_events) | |
# Session tracking | |
self.active_sessions: Dict[str, Dict[str, Any]] = {} | |
self.session_timeout = 3600 # 1 hour | |
# Aggregated statistics | |
self.stats = { | |
"total_queries": 0, | |
"total_documents_processed": 0, | |
"total_search_time": 0, | |
"total_processing_time": 0, | |
"avg_query_length": 0, | |
"avg_results_per_query": 0, | |
"popular_search_modes": defaultdict(int), | |
"file_type_distribution": defaultdict(int), | |
"error_count": 0, | |
"uptime_start": time.time() | |
} | |
# Thread safety | |
self._lock = threading.RLock() | |
# Auto-save setup | |
self.cache_dir = Path(config.get("cache", {}).get("cache_dir", "./cache")) | |
self.analytics_file = self.cache_dir / "analytics.json" | |
if self.analytics_enabled: | |
self._load_persistent_data() | |
self._start_auto_save() | |
def track_query( | |
self, | |
query: str, | |
search_mode: str, | |
results_count: int, | |
search_time: float, | |
user_session: str = None, | |
metadata_filters: Dict[str, Any] = None | |
) -> None: | |
"""Track a search query event.""" | |
if not self.analytics_enabled: | |
return | |
with self._lock: | |
event = QueryEvent( | |
timestamp=time.time(), | |
query=query, | |
query_length=len(query), | |
search_mode=search_mode, | |
results_count=results_count, | |
search_time=search_time, | |
user_session=user_session or "anonymous", | |
metadata_filters=metadata_filters | |
) | |
self.query_events.append(event) | |
# Update aggregated stats | |
self.stats["total_queries"] += 1 | |
self.stats["total_search_time"] += search_time | |
self.stats["popular_search_modes"][search_mode] += 1 | |
# Update averages | |
total_queries = self.stats["total_queries"] | |
self.stats["avg_query_length"] = ( | |
(self.stats["avg_query_length"] * (total_queries - 1) + len(query)) / total_queries | |
) | |
self.stats["avg_results_per_query"] = ( | |
(self.stats["avg_results_per_query"] * (total_queries - 1) + results_count) / total_queries | |
) | |
# Update session | |
self._update_session(user_session or "anonymous", "query") | |
def track_document_processing( | |
self, | |
filename: str, | |
file_size: int, | |
file_type: str, | |
processing_time: float, | |
chunk_count: int, | |
success: bool, | |
error_message: str = None, | |
user_session: str = None | |
) -> None: | |
"""Track a document processing event.""" | |
if not self.analytics_enabled: | |
return | |
with self._lock: | |
event = DocumentEvent( | |
timestamp=time.time(), | |
filename=filename, | |
file_size=file_size, | |
file_type=file_type, | |
processing_time=processing_time, | |
chunk_count=chunk_count, | |
success=success, | |
error_message=error_message, | |
user_session=user_session or "anonymous" | |
) | |
self.document_events.append(event) | |
# Update aggregated stats | |
if success: | |
self.stats["total_documents_processed"] += 1 | |
self.stats["total_processing_time"] += processing_time | |
self.stats["file_type_distribution"][file_type] += 1 | |
else: | |
self.stats["error_count"] += 1 | |
# Update session | |
self._update_session(user_session or "anonymous", "document_upload") | |
def track_system_event(self, event_type: str, details: Dict[str, Any]) -> None: | |
"""Track a system event.""" | |
if not self.analytics_enabled: | |
return | |
with self._lock: | |
event = { | |
"timestamp": time.time(), | |
"event_type": event_type, | |
"details": details | |
} | |
self.system_events.append(event) | |
def _update_session(self, session_id: str, action_type: str) -> None: | |
"""Update session information.""" | |
current_time = time.time() | |
if session_id not in self.active_sessions: | |
self.active_sessions[session_id] = { | |
"start_time": current_time, | |
"last_activity": current_time, | |
"action_count": 0, | |
"actions": defaultdict(int) | |
} | |
session = self.active_sessions[session_id] | |
session["last_activity"] = current_time | |
session["action_count"] += 1 | |
session["actions"][action_type] += 1 | |
def get_query_analytics(self, hours: int = 24) -> Dict[str, Any]: | |
"""Get query analytics for the specified time period.""" | |
if not self.analytics_enabled: | |
return {} | |
cutoff_time = time.time() - (hours * 3600) | |
with self._lock: | |
# Filter events by time | |
recent_queries = [ | |
event for event in self.query_events | |
if event.timestamp >= cutoff_time | |
] | |
if not recent_queries: | |
return { | |
"total_queries": 0, | |
"avg_search_time": 0, | |
"avg_query_length": 0, | |
"search_modes": {}, | |
"queries_per_hour": [], | |
"popular_terms": [] | |
} | |
# Calculate metrics | |
total_queries = len(recent_queries) | |
total_search_time = sum(q.search_time for q in recent_queries) | |
total_query_length = sum(q.query_length for q in recent_queries) | |
search_modes = defaultdict(int) | |
for query in recent_queries: | |
search_modes[query.search_mode] += 1 | |
# Query distribution over time | |
hour_buckets = defaultdict(int) | |
for query in recent_queries: | |
hour = int((query.timestamp - cutoff_time) // 3600) | |
hour_buckets[hour] += 1 | |
queries_per_hour = [hour_buckets[i] for i in range(hours)] | |
# Extract popular terms (simple word frequency) | |
all_terms = [] | |
for query in recent_queries: | |
terms = query.query.lower().split() | |
all_terms.extend([term.strip('.,!?') for term in terms if len(term) > 2]) | |
term_counts = defaultdict(int) | |
for term in all_terms: | |
term_counts[term] += 1 | |
popular_terms = sorted(term_counts.items(), key=lambda x: x[1], reverse=True)[:10] | |
return { | |
"total_queries": total_queries, | |
"avg_search_time": total_search_time / total_queries if total_queries > 0 else 0, | |
"avg_query_length": total_query_length / total_queries if total_queries > 0 else 0, | |
"search_modes": dict(search_modes), | |
"queries_per_hour": queries_per_hour, | |
"popular_terms": popular_terms | |
} | |
def get_document_analytics(self, hours: int = 24) -> Dict[str, Any]: | |
"""Get document processing analytics.""" | |
if not self.analytics_enabled: | |
return {} | |
cutoff_time = time.time() - (hours * 3600) | |
with self._lock: | |
recent_docs = [ | |
event for event in self.document_events | |
if event.timestamp >= cutoff_time | |
] | |
if not recent_docs: | |
return { | |
"total_documents": 0, | |
"successful_uploads": 0, | |
"failed_uploads": 0, | |
"avg_processing_time": 0, | |
"file_types": {}, | |
"total_chunks_created": 0 | |
} | |
successful_docs = [doc for doc in recent_docs if doc.success] | |
failed_docs = [doc for doc in recent_docs if not doc.success] | |
file_types = defaultdict(int) | |
total_processing_time = 0 | |
total_chunks = 0 | |
for doc in successful_docs: | |
file_types[doc.file_type] += 1 | |
total_processing_time += doc.processing_time | |
total_chunks += doc.chunk_count | |
return { | |
"total_documents": len(recent_docs), | |
"successful_uploads": len(successful_docs), | |
"failed_uploads": len(failed_docs), | |
"avg_processing_time": ( | |
total_processing_time / len(successful_docs) | |
if successful_docs else 0 | |
), | |
"file_types": dict(file_types), | |
"total_chunks_created": total_chunks | |
} | |
def get_system_analytics(self) -> Dict[str, Any]: | |
"""Get system-wide analytics.""" | |
with self._lock: | |
uptime = time.time() - self.stats["uptime_start"] | |
# Clean up expired sessions | |
self._cleanup_expired_sessions() | |
return { | |
"uptime_seconds": uptime, | |
"uptime_hours": uptime / 3600, | |
"active_sessions": len(self.active_sessions), | |
"total_queries": self.stats["total_queries"], | |
"total_documents_processed": self.stats["total_documents_processed"], | |
"error_count": self.stats["error_count"], | |
"avg_query_length": self.stats["avg_query_length"], | |
"avg_results_per_query": self.stats["avg_results_per_query"], | |
"popular_search_modes": dict(self.stats["popular_search_modes"]), | |
"file_type_distribution": dict(self.stats["file_type_distribution"]), | |
"queries_per_hour": ( | |
self.stats["total_queries"] / (uptime / 3600) | |
if uptime > 0 else 0 | |
), | |
"documents_per_hour": ( | |
self.stats["total_documents_processed"] / (uptime / 3600) | |
if uptime > 0 else 0 | |
) | |
} | |
def _cleanup_expired_sessions(self) -> None: | |
"""Remove expired sessions.""" | |
current_time = time.time() | |
expired_sessions = [ | |
session_id for session_id, session in self.active_sessions.items() | |
if current_time - session["last_activity"] > self.session_timeout | |
] | |
for session_id in expired_sessions: | |
del self.active_sessions[session_id] | |
def get_dashboard_data(self) -> Dict[str, Any]: | |
"""Get comprehensive dashboard data.""" | |
return { | |
"system": self.get_system_analytics(), | |
"queries_24h": self.get_query_analytics(24), | |
"documents_24h": self.get_document_analytics(24), | |
"queries_1h": self.get_query_analytics(1), | |
"last_updated": time.time() | |
} | |
def export_data(self, filepath: Optional[str] = None, hours: int = None) -> str: | |
"""Export analytics data to JSON file.""" | |
if filepath is None: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filepath = str(self.cache_dir / f"analytics_export_{timestamp}.json") | |
cutoff_time = time.time() - (hours * 3600) if hours else 0 | |
with self._lock: | |
# Filter events if time limit specified | |
if hours: | |
query_events = [ | |
event.to_dict() for event in self.query_events | |
if event.timestamp >= cutoff_time | |
] | |
document_events = [ | |
event.to_dict() for event in self.document_events | |
if event.timestamp >= cutoff_time | |
] | |
else: | |
query_events = [event.to_dict() for event in self.query_events] | |
document_events = [event.to_dict() for event in self.document_events] | |
export_data = { | |
"export_timestamp": time.time(), | |
"system_stats": self.get_system_analytics(), | |
"query_events": query_events, | |
"document_events": document_events, | |
"active_sessions": dict(self.active_sessions) | |
} | |
# Save to file | |
self.cache_dir.mkdir(parents=True, exist_ok=True) | |
with open(filepath, 'w') as f: | |
json.dump(export_data, f, indent=2, default=str) | |
return filepath | |
def _save_persistent_data(self) -> None: | |
"""Save analytics data to persistent storage.""" | |
if not self.analytics_enabled: | |
return | |
try: | |
self.cache_dir.mkdir(parents=True, exist_ok=True) | |
with self._lock: | |
data = { | |
"stats": dict(self.stats), | |
"query_events": [event.to_dict() for event in list(self.query_events)[-1000:]], # Last 1000 | |
"document_events": [event.to_dict() for event in list(self.document_events)[-1000:]], | |
"last_save": time.time() | |
} | |
with open(self.analytics_file, 'w') as f: | |
json.dump(data, f, indent=2, default=str) | |
except Exception as e: | |
print(f"Failed to save analytics data: {e}") | |
def _load_persistent_data(self) -> None: | |
"""Load analytics data from persistent storage.""" | |
if not self.analytics_file.exists(): | |
return | |
try: | |
with open(self.analytics_file, 'r') as f: | |
data = json.load(f) | |
# Restore stats | |
if "stats" in data: | |
for key, value in data["stats"].items(): | |
if key in self.stats: | |
if isinstance(value, dict): | |
self.stats[key] = defaultdict(int, value) | |
else: | |
self.stats[key] = value | |
print(f"Loaded analytics data from {self.analytics_file}") | |
except Exception as e: | |
print(f"Failed to load analytics data: {e}") | |
def _start_auto_save(self) -> None: | |
"""Start automatic saving of analytics data.""" | |
def save_periodically(): | |
while True: | |
time.sleep(self.save_interval) | |
self._save_persistent_data() | |
save_thread = threading.Thread(target=save_periodically, daemon=True) | |
save_thread.start() | |
def clear_data(self, confirm: bool = False) -> None: | |
"""Clear all analytics data.""" | |
if not confirm: | |
return | |
with self._lock: | |
self.query_events.clear() | |
self.document_events.clear() | |
self.system_events.clear() | |
self.active_sessions.clear() | |
# Reset stats | |
self.stats = { | |
"total_queries": 0, | |
"total_documents_processed": 0, | |
"total_search_time": 0, | |
"total_processing_time": 0, | |
"avg_query_length": 0, | |
"avg_results_per_query": 0, | |
"popular_search_modes": defaultdict(int), | |
"file_type_distribution": defaultdict(int), | |
"error_count": 0, | |
"uptime_start": time.time() | |
} | |
# Remove persistent file | |
if self.analytics_file.exists(): | |
self.analytics_file.unlink() | |
print("Analytics data cleared") | |
def shutdown(self) -> None: | |
"""Shutdown analytics manager and save data.""" | |
if self.analytics_enabled: | |
self._save_persistent_data() | |
print("Analytics data saved on shutdown") |