|
""" |
|
Analytics and usage tracking system for the RAG application. |
|
""" |
|
|
|
import time |
|
import json |
|
from pathlib import Path |
|
from typing import Any, Dict, List, Optional, Union |
|
from collections import defaultdict, deque |
|
from datetime import datetime, timedelta |
|
import threading |
|
from dataclasses import dataclass, asdict |
|
|
|
|
|
@dataclass |
|
class QueryEvent: |
|
"""Represents a query event for analytics.""" |
|
timestamp: float |
|
query: str |
|
query_length: int |
|
search_mode: str |
|
results_count: int |
|
search_time: float |
|
user_session: str = None |
|
metadata_filters: Dict[str, Any] = None |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
return asdict(self) |
|
|
|
|
|
@dataclass |
|
class DocumentEvent: |
|
"""Represents a document processing event.""" |
|
timestamp: float |
|
filename: str |
|
file_size: int |
|
file_type: str |
|
processing_time: float |
|
chunk_count: int |
|
success: bool |
|
error_message: str = None |
|
user_session: str = None |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
return asdict(self) |
|
|
|
|
|
class AnalyticsManager: |
|
"""Manages analytics and usage tracking for the RAG system.""" |
|
|
|
def __init__(self, config: Dict[str, Any]): |
|
self.config = config |
|
self.analytics_enabled = config.get("ui", {}).get("show_analytics", True) |
|
self.max_events = 10000 |
|
self.save_interval = 300 |
|
|
|
|
|
self.query_events: deque = deque(maxlen=self.max_events) |
|
self.document_events: deque = deque(maxlen=self.max_events) |
|
self.system_events: deque = deque(maxlen=self.max_events) |
|
|
|
|
|
self.active_sessions: Dict[str, Dict[str, Any]] = {} |
|
self.session_timeout = 3600 |
|
|
|
|
|
self.stats = { |
|
"total_queries": 0, |
|
"total_documents_processed": 0, |
|
"total_search_time": 0, |
|
"total_processing_time": 0, |
|
"avg_query_length": 0, |
|
"avg_results_per_query": 0, |
|
"popular_search_modes": defaultdict(int), |
|
"file_type_distribution": defaultdict(int), |
|
"error_count": 0, |
|
"uptime_start": time.time() |
|
} |
|
|
|
|
|
self._lock = threading.RLock() |
|
|
|
|
|
self.cache_dir = Path(config.get("cache", {}).get("cache_dir", "./cache")) |
|
self.analytics_file = self.cache_dir / "analytics.json" |
|
|
|
if self.analytics_enabled: |
|
self._load_persistent_data() |
|
self._start_auto_save() |
|
|
|
def track_query( |
|
self, |
|
query: str, |
|
search_mode: str, |
|
results_count: int, |
|
search_time: float, |
|
user_session: str = None, |
|
metadata_filters: Dict[str, Any] = None |
|
) -> None: |
|
"""Track a search query event.""" |
|
if not self.analytics_enabled: |
|
return |
|
|
|
with self._lock: |
|
event = QueryEvent( |
|
timestamp=time.time(), |
|
query=query, |
|
query_length=len(query), |
|
search_mode=search_mode, |
|
results_count=results_count, |
|
search_time=search_time, |
|
user_session=user_session or "anonymous", |
|
metadata_filters=metadata_filters |
|
) |
|
|
|
self.query_events.append(event) |
|
|
|
|
|
self.stats["total_queries"] += 1 |
|
self.stats["total_search_time"] += search_time |
|
self.stats["popular_search_modes"][search_mode] += 1 |
|
|
|
|
|
total_queries = self.stats["total_queries"] |
|
self.stats["avg_query_length"] = ( |
|
(self.stats["avg_query_length"] * (total_queries - 1) + len(query)) / total_queries |
|
) |
|
self.stats["avg_results_per_query"] = ( |
|
(self.stats["avg_results_per_query"] * (total_queries - 1) + results_count) / total_queries |
|
) |
|
|
|
|
|
self._update_session(user_session or "anonymous", "query") |
|
|
|
def track_document_processing( |
|
self, |
|
filename: str, |
|
file_size: int, |
|
file_type: str, |
|
processing_time: float, |
|
chunk_count: int, |
|
success: bool, |
|
error_message: str = None, |
|
user_session: str = None |
|
) -> None: |
|
"""Track a document processing event.""" |
|
if not self.analytics_enabled: |
|
return |
|
|
|
with self._lock: |
|
event = DocumentEvent( |
|
timestamp=time.time(), |
|
filename=filename, |
|
file_size=file_size, |
|
file_type=file_type, |
|
processing_time=processing_time, |
|
chunk_count=chunk_count, |
|
success=success, |
|
error_message=error_message, |
|
user_session=user_session or "anonymous" |
|
) |
|
|
|
self.document_events.append(event) |
|
|
|
|
|
if success: |
|
self.stats["total_documents_processed"] += 1 |
|
self.stats["total_processing_time"] += processing_time |
|
self.stats["file_type_distribution"][file_type] += 1 |
|
else: |
|
self.stats["error_count"] += 1 |
|
|
|
|
|
self._update_session(user_session or "anonymous", "document_upload") |
|
|
|
def track_system_event(self, event_type: str, details: Dict[str, Any]) -> None: |
|
"""Track a system event.""" |
|
if not self.analytics_enabled: |
|
return |
|
|
|
with self._lock: |
|
event = { |
|
"timestamp": time.time(), |
|
"event_type": event_type, |
|
"details": details |
|
} |
|
|
|
self.system_events.append(event) |
|
|
|
def _update_session(self, session_id: str, action_type: str) -> None: |
|
"""Update session information.""" |
|
current_time = time.time() |
|
|
|
if session_id not in self.active_sessions: |
|
self.active_sessions[session_id] = { |
|
"start_time": current_time, |
|
"last_activity": current_time, |
|
"action_count": 0, |
|
"actions": defaultdict(int) |
|
} |
|
|
|
session = self.active_sessions[session_id] |
|
session["last_activity"] = current_time |
|
session["action_count"] += 1 |
|
session["actions"][action_type] += 1 |
|
|
|
def get_query_analytics(self, hours: int = 24) -> Dict[str, Any]: |
|
"""Get query analytics for the specified time period.""" |
|
if not self.analytics_enabled: |
|
return {} |
|
|
|
cutoff_time = time.time() - (hours * 3600) |
|
|
|
with self._lock: |
|
|
|
recent_queries = [ |
|
event for event in self.query_events |
|
if event.timestamp >= cutoff_time |
|
] |
|
|
|
if not recent_queries: |
|
return { |
|
"total_queries": 0, |
|
"avg_search_time": 0, |
|
"avg_query_length": 0, |
|
"search_modes": {}, |
|
"queries_per_hour": [], |
|
"popular_terms": [] |
|
} |
|
|
|
|
|
total_queries = len(recent_queries) |
|
total_search_time = sum(q.search_time for q in recent_queries) |
|
total_query_length = sum(q.query_length for q in recent_queries) |
|
|
|
search_modes = defaultdict(int) |
|
for query in recent_queries: |
|
search_modes[query.search_mode] += 1 |
|
|
|
|
|
hour_buckets = defaultdict(int) |
|
for query in recent_queries: |
|
hour = int((query.timestamp - cutoff_time) // 3600) |
|
hour_buckets[hour] += 1 |
|
|
|
queries_per_hour = [hour_buckets[i] for i in range(hours)] |
|
|
|
|
|
all_terms = [] |
|
for query in recent_queries: |
|
terms = query.query.lower().split() |
|
all_terms.extend([term.strip('.,!?') for term in terms if len(term) > 2]) |
|
|
|
term_counts = defaultdict(int) |
|
for term in all_terms: |
|
term_counts[term] += 1 |
|
|
|
popular_terms = sorted(term_counts.items(), key=lambda x: x[1], reverse=True)[:10] |
|
|
|
return { |
|
"total_queries": total_queries, |
|
"avg_search_time": total_search_time / total_queries if total_queries > 0 else 0, |
|
"avg_query_length": total_query_length / total_queries if total_queries > 0 else 0, |
|
"search_modes": dict(search_modes), |
|
"queries_per_hour": queries_per_hour, |
|
"popular_terms": popular_terms |
|
} |
|
|
|
def get_document_analytics(self, hours: int = 24) -> Dict[str, Any]: |
|
"""Get document processing analytics.""" |
|
if not self.analytics_enabled: |
|
return {} |
|
|
|
cutoff_time = time.time() - (hours * 3600) |
|
|
|
with self._lock: |
|
recent_docs = [ |
|
event for event in self.document_events |
|
if event.timestamp >= cutoff_time |
|
] |
|
|
|
if not recent_docs: |
|
return { |
|
"total_documents": 0, |
|
"successful_uploads": 0, |
|
"failed_uploads": 0, |
|
"avg_processing_time": 0, |
|
"file_types": {}, |
|
"total_chunks_created": 0 |
|
} |
|
|
|
successful_docs = [doc for doc in recent_docs if doc.success] |
|
failed_docs = [doc for doc in recent_docs if not doc.success] |
|
|
|
file_types = defaultdict(int) |
|
total_processing_time = 0 |
|
total_chunks = 0 |
|
|
|
for doc in successful_docs: |
|
file_types[doc.file_type] += 1 |
|
total_processing_time += doc.processing_time |
|
total_chunks += doc.chunk_count |
|
|
|
return { |
|
"total_documents": len(recent_docs), |
|
"successful_uploads": len(successful_docs), |
|
"failed_uploads": len(failed_docs), |
|
"avg_processing_time": ( |
|
total_processing_time / len(successful_docs) |
|
if successful_docs else 0 |
|
), |
|
"file_types": dict(file_types), |
|
"total_chunks_created": total_chunks |
|
} |
|
|
|
def get_system_analytics(self) -> Dict[str, Any]: |
|
"""Get system-wide analytics.""" |
|
with self._lock: |
|
uptime = time.time() - self.stats["uptime_start"] |
|
|
|
|
|
self._cleanup_expired_sessions() |
|
|
|
return { |
|
"uptime_seconds": uptime, |
|
"uptime_hours": uptime / 3600, |
|
"active_sessions": len(self.active_sessions), |
|
"total_queries": self.stats["total_queries"], |
|
"total_documents_processed": self.stats["total_documents_processed"], |
|
"error_count": self.stats["error_count"], |
|
"avg_query_length": self.stats["avg_query_length"], |
|
"avg_results_per_query": self.stats["avg_results_per_query"], |
|
"popular_search_modes": dict(self.stats["popular_search_modes"]), |
|
"file_type_distribution": dict(self.stats["file_type_distribution"]), |
|
"queries_per_hour": ( |
|
self.stats["total_queries"] / (uptime / 3600) |
|
if uptime > 0 else 0 |
|
), |
|
"documents_per_hour": ( |
|
self.stats["total_documents_processed"] / (uptime / 3600) |
|
if uptime > 0 else 0 |
|
) |
|
} |
|
|
|
def _cleanup_expired_sessions(self) -> None: |
|
"""Remove expired sessions.""" |
|
current_time = time.time() |
|
expired_sessions = [ |
|
session_id for session_id, session in self.active_sessions.items() |
|
if current_time - session["last_activity"] > self.session_timeout |
|
] |
|
|
|
for session_id in expired_sessions: |
|
del self.active_sessions[session_id] |
|
|
|
def get_dashboard_data(self) -> Dict[str, Any]: |
|
"""Get comprehensive dashboard data.""" |
|
return { |
|
"system": self.get_system_analytics(), |
|
"queries_24h": self.get_query_analytics(24), |
|
"documents_24h": self.get_document_analytics(24), |
|
"queries_1h": self.get_query_analytics(1), |
|
"last_updated": time.time() |
|
} |
|
|
|
def export_data(self, filepath: Optional[str] = None, hours: int = None) -> str: |
|
"""Export analytics data to JSON file.""" |
|
if filepath is None: |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filepath = str(self.cache_dir / f"analytics_export_{timestamp}.json") |
|
|
|
cutoff_time = time.time() - (hours * 3600) if hours else 0 |
|
|
|
with self._lock: |
|
|
|
if hours: |
|
query_events = [ |
|
event.to_dict() for event in self.query_events |
|
if event.timestamp >= cutoff_time |
|
] |
|
document_events = [ |
|
event.to_dict() for event in self.document_events |
|
if event.timestamp >= cutoff_time |
|
] |
|
else: |
|
query_events = [event.to_dict() for event in self.query_events] |
|
document_events = [event.to_dict() for event in self.document_events] |
|
|
|
export_data = { |
|
"export_timestamp": time.time(), |
|
"system_stats": self.get_system_analytics(), |
|
"query_events": query_events, |
|
"document_events": document_events, |
|
"active_sessions": dict(self.active_sessions) |
|
} |
|
|
|
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True) |
|
with open(filepath, 'w') as f: |
|
json.dump(export_data, f, indent=2, default=str) |
|
|
|
return filepath |
|
|
|
def _save_persistent_data(self) -> None: |
|
"""Save analytics data to persistent storage.""" |
|
if not self.analytics_enabled: |
|
return |
|
|
|
try: |
|
self.cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
with self._lock: |
|
data = { |
|
"stats": dict(self.stats), |
|
"query_events": [event.to_dict() for event in list(self.query_events)[-1000:]], |
|
"document_events": [event.to_dict() for event in list(self.document_events)[-1000:]], |
|
"last_save": time.time() |
|
} |
|
|
|
with open(self.analytics_file, 'w') as f: |
|
json.dump(data, f, indent=2, default=str) |
|
|
|
except Exception as e: |
|
print(f"Failed to save analytics data: {e}") |
|
|
|
def _load_persistent_data(self) -> None: |
|
"""Load analytics data from persistent storage.""" |
|
if not self.analytics_file.exists(): |
|
return |
|
|
|
try: |
|
with open(self.analytics_file, 'r') as f: |
|
data = json.load(f) |
|
|
|
|
|
if "stats" in data: |
|
for key, value in data["stats"].items(): |
|
if key in self.stats: |
|
if isinstance(value, dict): |
|
self.stats[key] = defaultdict(int, value) |
|
else: |
|
self.stats[key] = value |
|
|
|
print(f"Loaded analytics data from {self.analytics_file}") |
|
|
|
except Exception as e: |
|
print(f"Failed to load analytics data: {e}") |
|
|
|
def _start_auto_save(self) -> None: |
|
"""Start automatic saving of analytics data.""" |
|
def save_periodically(): |
|
while True: |
|
time.sleep(self.save_interval) |
|
self._save_persistent_data() |
|
|
|
save_thread = threading.Thread(target=save_periodically, daemon=True) |
|
save_thread.start() |
|
|
|
def clear_data(self, confirm: bool = False) -> None: |
|
"""Clear all analytics data.""" |
|
if not confirm: |
|
return |
|
|
|
with self._lock: |
|
self.query_events.clear() |
|
self.document_events.clear() |
|
self.system_events.clear() |
|
self.active_sessions.clear() |
|
|
|
|
|
self.stats = { |
|
"total_queries": 0, |
|
"total_documents_processed": 0, |
|
"total_search_time": 0, |
|
"total_processing_time": 0, |
|
"avg_query_length": 0, |
|
"avg_results_per_query": 0, |
|
"popular_search_modes": defaultdict(int), |
|
"file_type_distribution": defaultdict(int), |
|
"error_count": 0, |
|
"uptime_start": time.time() |
|
} |
|
|
|
|
|
if self.analytics_file.exists(): |
|
self.analytics_file.unlink() |
|
|
|
print("Analytics data cleared") |
|
|
|
def shutdown(self) -> None: |
|
"""Shutdown analytics manager and save data.""" |
|
if self.analytics_enabled: |
|
self._save_persistent_data() |
|
print("Analytics data saved on shutdown") |