Spaces:
Paused
Paused
""" | |
Analytics Database Module for Query Logging and Performance Tracking. | |
Tracks every query, method, answer, and citation for comprehensive analytics. | |
""" | |
import sqlite3 | |
import json | |
import time | |
from datetime import datetime, timedelta | |
from typing import List, Dict, Any, Optional, Tuple | |
from pathlib import Path | |
import logging | |
from config import DATA_DIR | |
logger = logging.getLogger(__name__) | |
# Database path | |
ANALYTICS_DB = DATA_DIR / "analytics.db" | |
class AnalyticsDB: | |
"""Database manager for query analytics and logging.""" | |
def __init__(self): | |
self.db_path = ANALYTICS_DB | |
self._init_database() | |
def _init_database(self): | |
"""Initialize analytics database with required tables.""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
# Main queries table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS queries ( | |
query_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
timestamp TEXT NOT NULL, | |
user_query TEXT NOT NULL, | |
retrieval_method TEXT NOT NULL, | |
answer TEXT NOT NULL, | |
response_time_ms REAL, | |
num_citations INTEGER DEFAULT 0, | |
image_path TEXT, | |
error_message TEXT, | |
top_k_used INTEGER DEFAULT 5, | |
additional_settings TEXT, | |
answer_length INTEGER, | |
session_id TEXT | |
) | |
''') | |
# Citations table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS citations ( | |
citation_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
query_id INTEGER NOT NULL, | |
source TEXT NOT NULL, | |
citation_type TEXT, | |
relevance_score REAL, | |
bm25_score REAL, | |
rerank_score REAL, | |
similarity_score REAL, | |
url TEXT, | |
path TEXT, | |
rank INTEGER, | |
FOREIGN KEY (query_id) REFERENCES queries (query_id) | |
) | |
''') | |
# Performance metrics table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS performance_metrics ( | |
metric_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
query_id INTEGER NOT NULL, | |
retrieval_time_ms REAL, | |
generation_time_ms REAL, | |
total_time_ms REAL, | |
chunks_retrieved INTEGER, | |
tokens_estimated INTEGER, | |
FOREIGN KEY (query_id) REFERENCES queries (query_id) | |
) | |
''') | |
conn.commit() | |
conn.close() | |
logger.info("Analytics database initialized") | |
def log_query(self, user_query: str, method: str, answer: str, | |
citations: List[Dict], response_time: float = None, | |
image_path: str = None, error_message: str = None, | |
top_k: int = 5, additional_settings: Dict = None, | |
session_id: str = None) -> int: | |
""" | |
Log a complete query interaction. | |
Args: | |
user_query: The user's question | |
method: Retrieval method used | |
answer: Generated answer | |
citations: List of citation dictionaries | |
response_time: Time taken in milliseconds | |
image_path: Path to uploaded image (if any) | |
error_message: Error message (if any) | |
top_k: Number of chunks retrieved | |
additional_settings: Method-specific settings | |
session_id: Session identifier | |
Returns: | |
query_id: The ID of the logged query | |
""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
try: | |
# Insert main query record | |
cursor.execute(''' | |
INSERT INTO queries ( | |
timestamp, user_query, retrieval_method, answer, | |
response_time_ms, num_citations, image_path, error_message, | |
top_k_used, additional_settings, answer_length, session_id | |
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
datetime.now().isoformat(), | |
user_query, | |
method, | |
answer, | |
response_time, | |
len(citations), | |
image_path, | |
error_message, | |
top_k, | |
json.dumps(additional_settings) if additional_settings else None, | |
len(answer), | |
session_id | |
)) | |
query_id = cursor.lastrowid | |
# Insert citations | |
for rank, citation in enumerate(citations, 1): | |
cursor.execute(''' | |
INSERT INTO citations ( | |
query_id, source, citation_type, relevance_score, | |
bm25_score, rerank_score, similarity_score, url, path, rank | |
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
query_id, | |
citation.get('source', ''), | |
citation.get('type', ''), | |
citation.get('relevance_score'), | |
citation.get('bm25_score'), | |
citation.get('rerank_score'), | |
citation.get('similarity_score'), | |
citation.get('url'), | |
citation.get('path'), | |
rank | |
)) | |
conn.commit() | |
logger.info(f"Logged query {query_id} with {len(citations)} citations") | |
return query_id | |
except Exception as e: | |
logger.error(f"Error logging query: {e}") | |
conn.rollback() | |
return None | |
finally: | |
conn.close() | |
def get_query_stats(self, days: int = 30) -> Dict[str, Any]: | |
"""Get comprehensive query statistics.""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
since_date = (datetime.now() - timedelta(days=days)).isoformat() | |
try: | |
stats = {} | |
# Total queries | |
cursor.execute(''' | |
SELECT COUNT(*) FROM queries | |
WHERE timestamp >= ? | |
''', (since_date,)) | |
stats['total_queries'] = cursor.fetchone()[0] | |
# Method usage | |
cursor.execute(''' | |
SELECT retrieval_method, COUNT(*) as count | |
FROM queries | |
WHERE timestamp >= ? | |
GROUP BY retrieval_method | |
ORDER BY count DESC | |
''', (since_date,)) | |
stats['method_usage'] = dict(cursor.fetchall()) | |
# Average response times by method | |
cursor.execute(''' | |
SELECT retrieval_method, AVG(response_time_ms) as avg_time | |
FROM queries | |
WHERE timestamp >= ? AND response_time_ms IS NOT NULL | |
GROUP BY retrieval_method | |
''', (since_date,)) | |
stats['avg_response_times'] = dict(cursor.fetchall()) | |
# Citation statistics | |
cursor.execute(''' | |
SELECT AVG(num_citations) as avg_citations, | |
SUM(num_citations) as total_citations | |
FROM queries | |
WHERE timestamp >= ? | |
''', (since_date,)) | |
result = cursor.fetchone() | |
stats['avg_citations'] = result[0] or 0 | |
stats['total_citations'] = result[1] or 0 | |
# Citation types | |
cursor.execute(''' | |
SELECT c.citation_type, COUNT(*) as count | |
FROM citations c | |
JOIN queries q ON c.query_id = q.query_id | |
WHERE q.timestamp >= ? | |
GROUP BY c.citation_type | |
ORDER BY count DESC | |
''', (since_date,)) | |
stats['citation_types'] = dict(cursor.fetchall()) | |
# Error rate | |
cursor.execute(''' | |
SELECT | |
COUNT(CASE WHEN error_message IS NOT NULL THEN 1 END) as errors, | |
COUNT(*) as total | |
FROM queries | |
WHERE timestamp >= ? | |
''', (since_date,)) | |
result = cursor.fetchone() | |
stats['error_rate'] = (result[0] / result[1]) * 100 if result[1] > 0 else 0 | |
# Most common query topics (simple word analysis) | |
cursor.execute(''' | |
SELECT user_query FROM queries | |
WHERE timestamp >= ? | |
''', (since_date,)) | |
queries = [row[0].lower() for row in cursor.fetchall()] | |
# Simple keyword extraction | |
keywords = {} | |
for query in queries: | |
words = [word for word in query.split() if len(word) > 3] | |
for word in words: | |
keywords[word] = keywords.get(word, 0) + 1 | |
# Top 10 keywords | |
stats['top_keywords'] = dict(sorted(keywords.items(), | |
key=lambda x: x[1], | |
reverse=True)[:10]) | |
return stats | |
except Exception as e: | |
logger.error(f"Error getting query stats: {e}") | |
return {} | |
finally: | |
conn.close() | |
def get_method_performance(self) -> Dict[str, Dict[str, float]]: | |
"""Get detailed performance metrics by method.""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
try: | |
cursor.execute(''' | |
SELECT | |
retrieval_method, | |
AVG(response_time_ms) as avg_response_time, | |
AVG(num_citations) as avg_citations, | |
AVG(answer_length) as avg_answer_length, | |
COUNT(*) as query_count | |
FROM queries | |
WHERE response_time_ms IS NOT NULL | |
GROUP BY retrieval_method | |
''') | |
results = {} | |
for row in cursor.fetchall(): | |
method, avg_time, avg_cites, avg_length, count = row | |
results[method] = { | |
'avg_response_time': avg_time, | |
'avg_citations': avg_cites, | |
'avg_answer_length': avg_length, | |
'query_count': count | |
} | |
return results | |
except Exception as e: | |
logger.error(f"Error getting method performance: {e}") | |
return {} | |
finally: | |
conn.close() | |
def get_recent_queries(self, limit: int = 20, include_answers: bool = True) -> List[Dict[str, Any]]: | |
"""Get recent queries with basic information and optionally full answers.""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
try: | |
if include_answers: | |
cursor.execute(''' | |
SELECT query_id, timestamp, user_query, retrieval_method, | |
answer, answer_length, num_citations, response_time_ms, error_message | |
FROM queries | |
ORDER BY timestamp DESC | |
LIMIT ? | |
''', (limit,)) | |
columns = ['query_id', 'timestamp', 'query', 'method', | |
'answer', 'answer_length', 'citations', 'response_time', 'error_message'] | |
else: | |
cursor.execute(''' | |
SELECT query_id, timestamp, user_query, retrieval_method, | |
answer_length, num_citations, response_time_ms | |
FROM queries | |
ORDER BY timestamp DESC | |
LIMIT ? | |
''', (limit,)) | |
columns = ['query_id', 'timestamp', 'query', 'method', | |
'answer_length', 'citations', 'response_time'] | |
return [dict(zip(columns, row)) for row in cursor.fetchall()] | |
except Exception as e: | |
logger.error(f"Error getting recent queries: {e}") | |
return [] | |
finally: | |
conn.close() | |
def get_query_with_citations(self, query_id: int) -> Dict[str, Any]: | |
"""Get full query details including citations.""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
try: | |
# Get query details | |
cursor.execute(''' | |
SELECT query_id, timestamp, user_query, retrieval_method, answer, | |
response_time_ms, num_citations, error_message, top_k_used | |
FROM queries WHERE query_id = ? | |
''', (query_id,)) | |
query_row = cursor.fetchone() | |
if not query_row: | |
return {} | |
query_data = { | |
'query_id': query_row[0], | |
'timestamp': query_row[1], | |
'user_query': query_row[2], | |
'method': query_row[3], | |
'answer': query_row[4], | |
'response_time': query_row[5], | |
'num_citations': query_row[6], | |
'error_message': query_row[7], | |
'top_k_used': query_row[8] | |
} | |
# Get citations | |
cursor.execute(''' | |
SELECT source, citation_type, relevance_score, bm25_score, | |
rerank_score, similarity_score, url, path, rank | |
FROM citations WHERE query_id = ? | |
ORDER BY rank | |
''', (query_id,)) | |
citations = [] | |
for row in cursor.fetchall(): | |
citation = { | |
'source': row[0], | |
'type': row[1], | |
'relevance_score': row[2], | |
'bm25_score': row[3], | |
'rerank_score': row[4], | |
'similarity_score': row[5], | |
'url': row[6], | |
'path': row[7], | |
'rank': row[8] | |
} | |
citations.append(citation) | |
query_data['citations'] = citations | |
return query_data | |
except Exception as e: | |
logger.error(f"Error getting query with citations: {e}") | |
return {} | |
finally: | |
conn.close() | |
def get_query_trends(self, days: int = 30) -> Dict[str, List[Tuple[str, int]]]: | |
"""Get query trends over time.""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
since_date = (datetime.now() - timedelta(days=days)).isoformat() | |
try: | |
# Queries per day | |
cursor.execute(''' | |
SELECT DATE(timestamp) as date, COUNT(*) as count | |
FROM queries | |
WHERE timestamp >= ? | |
GROUP BY DATE(timestamp) | |
ORDER BY date | |
''', (since_date,)) | |
daily_queries = cursor.fetchall() | |
# Method usage trends | |
cursor.execute(''' | |
SELECT DATE(timestamp) as date, retrieval_method, COUNT(*) as count | |
FROM queries | |
WHERE timestamp >= ? | |
GROUP BY DATE(timestamp), retrieval_method | |
ORDER BY date, retrieval_method | |
''', (since_date,)) | |
method_trends = {} | |
for date, method, count in cursor.fetchall(): | |
if method not in method_trends: | |
method_trends[method] = [] | |
method_trends[method].append((date, count)) | |
return { | |
'daily_queries': daily_queries, | |
'method_trends': method_trends | |
} | |
except Exception as e: | |
logger.error(f"Error getting query trends: {e}") | |
return {} | |
finally: | |
conn.close() | |
def get_voice_interaction_stats(self) -> Dict[str, Any]: | |
"""Get statistics about voice interactions.""" | |
try: | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
# Count voice interactions (those with voice_interaction=true in additional_settings) | |
cursor.execute(''' | |
SELECT COUNT(*) as total_voice_queries | |
FROM queries | |
WHERE additional_settings LIKE '%voice_interaction%' | |
OR session_id LIKE 'voice_%' | |
''') | |
result = cursor.fetchone() | |
total_voice = result[0] if result else 0 | |
# Get voice queries by method | |
cursor.execute(''' | |
SELECT retrieval_method, COUNT(*) as count | |
FROM queries | |
WHERE additional_settings LIKE '%voice_interaction%' | |
OR session_id LIKE 'voice_%' | |
GROUP BY retrieval_method | |
''') | |
voice_by_method = dict(cursor.fetchall()) | |
# Average response time for voice queries | |
cursor.execute(''' | |
SELECT AVG(response_time_ms) as avg_response_time | |
FROM queries | |
WHERE (additional_settings LIKE '%voice_interaction%' | |
OR session_id LIKE 'voice_%') | |
AND response_time_ms IS NOT NULL | |
''') | |
result = cursor.fetchone() | |
avg_response_time = result[0] if result and result[0] else 0 | |
return { | |
'total_voice_queries': total_voice, | |
'voice_by_method': voice_by_method, | |
'avg_voice_response_time': avg_response_time | |
} | |
except Exception as e: | |
logger.error(f"Error getting voice interaction stats: {e}") | |
return {} | |
finally: | |
conn.close() | |
# Global instance | |
analytics_db = AnalyticsDB() | |
# Convenience functions | |
def log_query(user_query: str, method: str, answer: str, citations: List[Dict], | |
**kwargs) -> int: | |
"""Log a query to the analytics database.""" | |
return analytics_db.log_query(user_query, method, answer, citations, **kwargs) | |
def get_analytics_stats(days: int = 30) -> Dict[str, Any]: | |
"""Get analytics statistics.""" | |
return analytics_db.get_query_stats(days) | |
def get_method_performance() -> Dict[str, Dict[str, float]]: | |
"""Get method performance metrics.""" | |
return analytics_db.get_method_performance() |