""" Analytics Database Module for Query Logging and Performance Tracking. Tracks every query, method, answer, and citation for comprehensive analytics. """ import sqlite3 import json import time from datetime import datetime, timedelta from typing import List, Dict, Any, Optional, Tuple from pathlib import Path import logging from config import DATA_DIR logger = logging.getLogger(__name__) # Database path ANALYTICS_DB = DATA_DIR / "analytics.db" class AnalyticsDB: """Database manager for query analytics and logging.""" def __init__(self): self.db_path = ANALYTICS_DB self._init_database() def _init_database(self): """Initialize analytics database with required tables.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Main queries table cursor.execute(''' CREATE TABLE IF NOT EXISTS queries ( query_id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, user_query TEXT NOT NULL, retrieval_method TEXT NOT NULL, answer TEXT NOT NULL, response_time_ms REAL, num_citations INTEGER DEFAULT 0, image_path TEXT, error_message TEXT, top_k_used INTEGER DEFAULT 5, additional_settings TEXT, answer_length INTEGER, session_id TEXT ) ''') # Citations table cursor.execute(''' CREATE TABLE IF NOT EXISTS citations ( citation_id INTEGER PRIMARY KEY AUTOINCREMENT, query_id INTEGER NOT NULL, source TEXT NOT NULL, citation_type TEXT, relevance_score REAL, bm25_score REAL, rerank_score REAL, similarity_score REAL, url TEXT, path TEXT, rank INTEGER, FOREIGN KEY (query_id) REFERENCES queries (query_id) ) ''') # Performance metrics table cursor.execute(''' CREATE TABLE IF NOT EXISTS performance_metrics ( metric_id INTEGER PRIMARY KEY AUTOINCREMENT, query_id INTEGER NOT NULL, retrieval_time_ms REAL, generation_time_ms REAL, total_time_ms REAL, chunks_retrieved INTEGER, tokens_estimated INTEGER, FOREIGN KEY (query_id) REFERENCES queries (query_id) ) ''') conn.commit() conn.close() logger.info("Analytics database initialized") def log_query(self, user_query: str, method: str, answer: str, citations: List[Dict], response_time: float = None, image_path: str = None, error_message: str = None, top_k: int = 5, additional_settings: Dict = None, session_id: str = None) -> int: """ Log a complete query interaction. Args: user_query: The user's question method: Retrieval method used answer: Generated answer citations: List of citation dictionaries response_time: Time taken in milliseconds image_path: Path to uploaded image (if any) error_message: Error message (if any) top_k: Number of chunks retrieved additional_settings: Method-specific settings session_id: Session identifier Returns: query_id: The ID of the logged query """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # Insert main query record cursor.execute(''' INSERT INTO queries ( timestamp, user_query, retrieval_method, answer, response_time_ms, num_citations, image_path, error_message, top_k_used, additional_settings, answer_length, session_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( datetime.now().isoformat(), user_query, method, answer, response_time, len(citations), image_path, error_message, top_k, json.dumps(additional_settings) if additional_settings else None, len(answer), session_id )) query_id = cursor.lastrowid # Insert citations for rank, citation in enumerate(citations, 1): cursor.execute(''' INSERT INTO citations ( query_id, source, citation_type, relevance_score, bm25_score, rerank_score, similarity_score, url, path, rank ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( query_id, citation.get('source', ''), citation.get('type', ''), citation.get('relevance_score'), citation.get('bm25_score'), citation.get('rerank_score'), citation.get('similarity_score'), citation.get('url'), citation.get('path'), rank )) conn.commit() logger.info(f"Logged query {query_id} with {len(citations)} citations") return query_id except Exception as e: logger.error(f"Error logging query: {e}") conn.rollback() return None finally: conn.close() def get_query_stats(self, days: int = 30) -> Dict[str, Any]: """Get comprehensive query statistics.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() since_date = (datetime.now() - timedelta(days=days)).isoformat() try: stats = {} # Total queries cursor.execute(''' SELECT COUNT(*) FROM queries WHERE timestamp >= ? ''', (since_date,)) stats['total_queries'] = cursor.fetchone()[0] # Method usage cursor.execute(''' SELECT retrieval_method, COUNT(*) as count FROM queries WHERE timestamp >= ? GROUP BY retrieval_method ORDER BY count DESC ''', (since_date,)) stats['method_usage'] = dict(cursor.fetchall()) # Average response times by method cursor.execute(''' SELECT retrieval_method, AVG(response_time_ms) as avg_time FROM queries WHERE timestamp >= ? AND response_time_ms IS NOT NULL GROUP BY retrieval_method ''', (since_date,)) stats['avg_response_times'] = dict(cursor.fetchall()) # Citation statistics cursor.execute(''' SELECT AVG(num_citations) as avg_citations, SUM(num_citations) as total_citations FROM queries WHERE timestamp >= ? ''', (since_date,)) result = cursor.fetchone() stats['avg_citations'] = result[0] or 0 stats['total_citations'] = result[1] or 0 # Citation types cursor.execute(''' SELECT c.citation_type, COUNT(*) as count FROM citations c JOIN queries q ON c.query_id = q.query_id WHERE q.timestamp >= ? GROUP BY c.citation_type ORDER BY count DESC ''', (since_date,)) stats['citation_types'] = dict(cursor.fetchall()) # Error rate cursor.execute(''' SELECT COUNT(CASE WHEN error_message IS NOT NULL THEN 1 END) as errors, COUNT(*) as total FROM queries WHERE timestamp >= ? ''', (since_date,)) result = cursor.fetchone() stats['error_rate'] = (result[0] / result[1]) * 100 if result[1] > 0 else 0 # Most common query topics (simple word analysis) cursor.execute(''' SELECT user_query FROM queries WHERE timestamp >= ? ''', (since_date,)) queries = [row[0].lower() for row in cursor.fetchall()] # Simple keyword extraction keywords = {} for query in queries: words = [word for word in query.split() if len(word) > 3] for word in words: keywords[word] = keywords.get(word, 0) + 1 # Top 10 keywords stats['top_keywords'] = dict(sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:10]) return stats except Exception as e: logger.error(f"Error getting query stats: {e}") return {} finally: conn.close() def get_method_performance(self) -> Dict[str, Dict[str, float]]: """Get detailed performance metrics by method.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' SELECT retrieval_method, AVG(response_time_ms) as avg_response_time, AVG(num_citations) as avg_citations, AVG(answer_length) as avg_answer_length, COUNT(*) as query_count FROM queries WHERE response_time_ms IS NOT NULL GROUP BY retrieval_method ''') results = {} for row in cursor.fetchall(): method, avg_time, avg_cites, avg_length, count = row results[method] = { 'avg_response_time': avg_time, 'avg_citations': avg_cites, 'avg_answer_length': avg_length, 'query_count': count } return results except Exception as e: logger.error(f"Error getting method performance: {e}") return {} finally: conn.close() def get_recent_queries(self, limit: int = 20, include_answers: bool = True) -> List[Dict[str, Any]]: """Get recent queries with basic information and optionally full answers.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: if include_answers: cursor.execute(''' SELECT query_id, timestamp, user_query, retrieval_method, answer, answer_length, num_citations, response_time_ms, error_message FROM queries ORDER BY timestamp DESC LIMIT ? ''', (limit,)) columns = ['query_id', 'timestamp', 'query', 'method', 'answer', 'answer_length', 'citations', 'response_time', 'error_message'] else: cursor.execute(''' SELECT query_id, timestamp, user_query, retrieval_method, answer_length, num_citations, response_time_ms FROM queries ORDER BY timestamp DESC LIMIT ? ''', (limit,)) columns = ['query_id', 'timestamp', 'query', 'method', 'answer_length', 'citations', 'response_time'] return [dict(zip(columns, row)) for row in cursor.fetchall()] except Exception as e: logger.error(f"Error getting recent queries: {e}") return [] finally: conn.close() def get_query_with_citations(self, query_id: int) -> Dict[str, Any]: """Get full query details including citations.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # Get query details cursor.execute(''' SELECT query_id, timestamp, user_query, retrieval_method, answer, response_time_ms, num_citations, error_message, top_k_used FROM queries WHERE query_id = ? ''', (query_id,)) query_row = cursor.fetchone() if not query_row: return {} query_data = { 'query_id': query_row[0], 'timestamp': query_row[1], 'user_query': query_row[2], 'method': query_row[3], 'answer': query_row[4], 'response_time': query_row[5], 'num_citations': query_row[6], 'error_message': query_row[7], 'top_k_used': query_row[8] } # Get citations cursor.execute(''' SELECT source, citation_type, relevance_score, bm25_score, rerank_score, similarity_score, url, path, rank FROM citations WHERE query_id = ? ORDER BY rank ''', (query_id,)) citations = [] for row in cursor.fetchall(): citation = { 'source': row[0], 'type': row[1], 'relevance_score': row[2], 'bm25_score': row[3], 'rerank_score': row[4], 'similarity_score': row[5], 'url': row[6], 'path': row[7], 'rank': row[8] } citations.append(citation) query_data['citations'] = citations return query_data except Exception as e: logger.error(f"Error getting query with citations: {e}") return {} finally: conn.close() def get_query_trends(self, days: int = 30) -> Dict[str, List[Tuple[str, int]]]: """Get query trends over time.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() since_date = (datetime.now() - timedelta(days=days)).isoformat() try: # Queries per day cursor.execute(''' SELECT DATE(timestamp) as date, COUNT(*) as count FROM queries WHERE timestamp >= ? GROUP BY DATE(timestamp) ORDER BY date ''', (since_date,)) daily_queries = cursor.fetchall() # Method usage trends cursor.execute(''' SELECT DATE(timestamp) as date, retrieval_method, COUNT(*) as count FROM queries WHERE timestamp >= ? GROUP BY DATE(timestamp), retrieval_method ORDER BY date, retrieval_method ''', (since_date,)) method_trends = {} for date, method, count in cursor.fetchall(): if method not in method_trends: method_trends[method] = [] method_trends[method].append((date, count)) return { 'daily_queries': daily_queries, 'method_trends': method_trends } except Exception as e: logger.error(f"Error getting query trends: {e}") return {} finally: conn.close() def get_voice_interaction_stats(self) -> Dict[str, Any]: """Get statistics about voice interactions.""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Count voice interactions (those with voice_interaction=true in additional_settings) cursor.execute(''' SELECT COUNT(*) as total_voice_queries FROM queries WHERE additional_settings LIKE '%voice_interaction%' OR session_id LIKE 'voice_%' ''') result = cursor.fetchone() total_voice = result[0] if result else 0 # Get voice queries by method cursor.execute(''' SELECT retrieval_method, COUNT(*) as count FROM queries WHERE additional_settings LIKE '%voice_interaction%' OR session_id LIKE 'voice_%' GROUP BY retrieval_method ''') voice_by_method = dict(cursor.fetchall()) # Average response time for voice queries cursor.execute(''' SELECT AVG(response_time_ms) as avg_response_time FROM queries WHERE (additional_settings LIKE '%voice_interaction%' OR session_id LIKE 'voice_%') AND response_time_ms IS NOT NULL ''') result = cursor.fetchone() avg_response_time = result[0] if result and result[0] else 0 return { 'total_voice_queries': total_voice, 'voice_by_method': voice_by_method, 'avg_voice_response_time': avg_response_time } except Exception as e: logger.error(f"Error getting voice interaction stats: {e}") return {} finally: conn.close() # Global instance analytics_db = AnalyticsDB() # Convenience functions def log_query(user_query: str, method: str, answer: str, citations: List[Dict], **kwargs) -> int: """Log a query to the analytics database.""" return analytics_db.log_query(user_query, method, answer, citations, **kwargs) def get_analytics_stats(days: int = 30) -> Dict[str, Any]: """Get analytics statistics.""" return analytics_db.get_query_stats(days) def get_method_performance() -> Dict[str, Dict[str, float]]: """Get method performance metrics.""" return analytics_db.get_method_performance()