sight_chat / analytics_db.py
fmegahed's picture
version 2.0.0
ef821d9 verified
"""
Analytics Database Module for Query Logging and Performance Tracking.
Tracks every query, method, answer, and citation for comprehensive analytics.
"""
import sqlite3
import json
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
import logging
from config import DATA_DIR
logger = logging.getLogger(__name__)
# Database path
ANALYTICS_DB = DATA_DIR / "analytics.db"
class AnalyticsDB:
"""Database manager for query analytics and logging."""
def __init__(self):
self.db_path = ANALYTICS_DB
self._init_database()
def _init_database(self):
"""Initialize analytics database with required tables."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Main queries table
cursor.execute('''
CREATE TABLE IF NOT EXISTS queries (
query_id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
user_query TEXT NOT NULL,
retrieval_method TEXT NOT NULL,
answer TEXT NOT NULL,
response_time_ms REAL,
num_citations INTEGER DEFAULT 0,
image_path TEXT,
error_message TEXT,
top_k_used INTEGER DEFAULT 5,
additional_settings TEXT,
answer_length INTEGER,
session_id TEXT
)
''')
# Citations table
cursor.execute('''
CREATE TABLE IF NOT EXISTS citations (
citation_id INTEGER PRIMARY KEY AUTOINCREMENT,
query_id INTEGER NOT NULL,
source TEXT NOT NULL,
citation_type TEXT,
relevance_score REAL,
bm25_score REAL,
rerank_score REAL,
similarity_score REAL,
url TEXT,
path TEXT,
rank INTEGER,
FOREIGN KEY (query_id) REFERENCES queries (query_id)
)
''')
# Performance metrics table
cursor.execute('''
CREATE TABLE IF NOT EXISTS performance_metrics (
metric_id INTEGER PRIMARY KEY AUTOINCREMENT,
query_id INTEGER NOT NULL,
retrieval_time_ms REAL,
generation_time_ms REAL,
total_time_ms REAL,
chunks_retrieved INTEGER,
tokens_estimated INTEGER,
FOREIGN KEY (query_id) REFERENCES queries (query_id)
)
''')
conn.commit()
conn.close()
logger.info("Analytics database initialized")
def log_query(self, user_query: str, method: str, answer: str,
citations: List[Dict], response_time: float = None,
image_path: str = None, error_message: str = None,
top_k: int = 5, additional_settings: Dict = None,
session_id: str = None) -> int:
"""
Log a complete query interaction.
Args:
user_query: The user's question
method: Retrieval method used
answer: Generated answer
citations: List of citation dictionaries
response_time: Time taken in milliseconds
image_path: Path to uploaded image (if any)
error_message: Error message (if any)
top_k: Number of chunks retrieved
additional_settings: Method-specific settings
session_id: Session identifier
Returns:
query_id: The ID of the logged query
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# Insert main query record
cursor.execute('''
INSERT INTO queries (
timestamp, user_query, retrieval_method, answer,
response_time_ms, num_citations, image_path, error_message,
top_k_used, additional_settings, answer_length, session_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
user_query,
method,
answer,
response_time,
len(citations),
image_path,
error_message,
top_k,
json.dumps(additional_settings) if additional_settings else None,
len(answer),
session_id
))
query_id = cursor.lastrowid
# Insert citations
for rank, citation in enumerate(citations, 1):
cursor.execute('''
INSERT INTO citations (
query_id, source, citation_type, relevance_score,
bm25_score, rerank_score, similarity_score, url, path, rank
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
query_id,
citation.get('source', ''),
citation.get('type', ''),
citation.get('relevance_score'),
citation.get('bm25_score'),
citation.get('rerank_score'),
citation.get('similarity_score'),
citation.get('url'),
citation.get('path'),
rank
))
conn.commit()
logger.info(f"Logged query {query_id} with {len(citations)} citations")
return query_id
except Exception as e:
logger.error(f"Error logging query: {e}")
conn.rollback()
return None
finally:
conn.close()
def get_query_stats(self, days: int = 30) -> Dict[str, Any]:
"""Get comprehensive query statistics."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
since_date = (datetime.now() - timedelta(days=days)).isoformat()
try:
stats = {}
# Total queries
cursor.execute('''
SELECT COUNT(*) FROM queries
WHERE timestamp >= ?
''', (since_date,))
stats['total_queries'] = cursor.fetchone()[0]
# Method usage
cursor.execute('''
SELECT retrieval_method, COUNT(*) as count
FROM queries
WHERE timestamp >= ?
GROUP BY retrieval_method
ORDER BY count DESC
''', (since_date,))
stats['method_usage'] = dict(cursor.fetchall())
# Average response times by method
cursor.execute('''
SELECT retrieval_method, AVG(response_time_ms) as avg_time
FROM queries
WHERE timestamp >= ? AND response_time_ms IS NOT NULL
GROUP BY retrieval_method
''', (since_date,))
stats['avg_response_times'] = dict(cursor.fetchall())
# Citation statistics
cursor.execute('''
SELECT AVG(num_citations) as avg_citations,
SUM(num_citations) as total_citations
FROM queries
WHERE timestamp >= ?
''', (since_date,))
result = cursor.fetchone()
stats['avg_citations'] = result[0] or 0
stats['total_citations'] = result[1] or 0
# Citation types
cursor.execute('''
SELECT c.citation_type, COUNT(*) as count
FROM citations c
JOIN queries q ON c.query_id = q.query_id
WHERE q.timestamp >= ?
GROUP BY c.citation_type
ORDER BY count DESC
''', (since_date,))
stats['citation_types'] = dict(cursor.fetchall())
# Error rate
cursor.execute('''
SELECT
COUNT(CASE WHEN error_message IS NOT NULL THEN 1 END) as errors,
COUNT(*) as total
FROM queries
WHERE timestamp >= ?
''', (since_date,))
result = cursor.fetchone()
stats['error_rate'] = (result[0] / result[1]) * 100 if result[1] > 0 else 0
# Most common query topics (simple word analysis)
cursor.execute('''
SELECT user_query FROM queries
WHERE timestamp >= ?
''', (since_date,))
queries = [row[0].lower() for row in cursor.fetchall()]
# Simple keyword extraction
keywords = {}
for query in queries:
words = [word for word in query.split() if len(word) > 3]
for word in words:
keywords[word] = keywords.get(word, 0) + 1
# Top 10 keywords
stats['top_keywords'] = dict(sorted(keywords.items(),
key=lambda x: x[1],
reverse=True)[:10])
return stats
except Exception as e:
logger.error(f"Error getting query stats: {e}")
return {}
finally:
conn.close()
def get_method_performance(self) -> Dict[str, Dict[str, float]]:
"""Get detailed performance metrics by method."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
SELECT
retrieval_method,
AVG(response_time_ms) as avg_response_time,
AVG(num_citations) as avg_citations,
AVG(answer_length) as avg_answer_length,
COUNT(*) as query_count
FROM queries
WHERE response_time_ms IS NOT NULL
GROUP BY retrieval_method
''')
results = {}
for row in cursor.fetchall():
method, avg_time, avg_cites, avg_length, count = row
results[method] = {
'avg_response_time': avg_time,
'avg_citations': avg_cites,
'avg_answer_length': avg_length,
'query_count': count
}
return results
except Exception as e:
logger.error(f"Error getting method performance: {e}")
return {}
finally:
conn.close()
def get_recent_queries(self, limit: int = 20, include_answers: bool = True) -> List[Dict[str, Any]]:
"""Get recent queries with basic information and optionally full answers."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
if include_answers:
cursor.execute('''
SELECT query_id, timestamp, user_query, retrieval_method,
answer, answer_length, num_citations, response_time_ms, error_message
FROM queries
ORDER BY timestamp DESC
LIMIT ?
''', (limit,))
columns = ['query_id', 'timestamp', 'query', 'method',
'answer', 'answer_length', 'citations', 'response_time', 'error_message']
else:
cursor.execute('''
SELECT query_id, timestamp, user_query, retrieval_method,
answer_length, num_citations, response_time_ms
FROM queries
ORDER BY timestamp DESC
LIMIT ?
''', (limit,))
columns = ['query_id', 'timestamp', 'query', 'method',
'answer_length', 'citations', 'response_time']
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Error getting recent queries: {e}")
return []
finally:
conn.close()
def get_query_with_citations(self, query_id: int) -> Dict[str, Any]:
"""Get full query details including citations."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# Get query details
cursor.execute('''
SELECT query_id, timestamp, user_query, retrieval_method, answer,
response_time_ms, num_citations, error_message, top_k_used
FROM queries WHERE query_id = ?
''', (query_id,))
query_row = cursor.fetchone()
if not query_row:
return {}
query_data = {
'query_id': query_row[0],
'timestamp': query_row[1],
'user_query': query_row[2],
'method': query_row[3],
'answer': query_row[4],
'response_time': query_row[5],
'num_citations': query_row[6],
'error_message': query_row[7],
'top_k_used': query_row[8]
}
# Get citations
cursor.execute('''
SELECT source, citation_type, relevance_score, bm25_score,
rerank_score, similarity_score, url, path, rank
FROM citations WHERE query_id = ?
ORDER BY rank
''', (query_id,))
citations = []
for row in cursor.fetchall():
citation = {
'source': row[0],
'type': row[1],
'relevance_score': row[2],
'bm25_score': row[3],
'rerank_score': row[4],
'similarity_score': row[5],
'url': row[6],
'path': row[7],
'rank': row[8]
}
citations.append(citation)
query_data['citations'] = citations
return query_data
except Exception as e:
logger.error(f"Error getting query with citations: {e}")
return {}
finally:
conn.close()
def get_query_trends(self, days: int = 30) -> Dict[str, List[Tuple[str, int]]]:
"""Get query trends over time."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
since_date = (datetime.now() - timedelta(days=days)).isoformat()
try:
# Queries per day
cursor.execute('''
SELECT DATE(timestamp) as date, COUNT(*) as count
FROM queries
WHERE timestamp >= ?
GROUP BY DATE(timestamp)
ORDER BY date
''', (since_date,))
daily_queries = cursor.fetchall()
# Method usage trends
cursor.execute('''
SELECT DATE(timestamp) as date, retrieval_method, COUNT(*) as count
FROM queries
WHERE timestamp >= ?
GROUP BY DATE(timestamp), retrieval_method
ORDER BY date, retrieval_method
''', (since_date,))
method_trends = {}
for date, method, count in cursor.fetchall():
if method not in method_trends:
method_trends[method] = []
method_trends[method].append((date, count))
return {
'daily_queries': daily_queries,
'method_trends': method_trends
}
except Exception as e:
logger.error(f"Error getting query trends: {e}")
return {}
finally:
conn.close()
def get_voice_interaction_stats(self) -> Dict[str, Any]:
"""Get statistics about voice interactions."""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Count voice interactions (those with voice_interaction=true in additional_settings)
cursor.execute('''
SELECT COUNT(*) as total_voice_queries
FROM queries
WHERE additional_settings LIKE '%voice_interaction%'
OR session_id LIKE 'voice_%'
''')
result = cursor.fetchone()
total_voice = result[0] if result else 0
# Get voice queries by method
cursor.execute('''
SELECT retrieval_method, COUNT(*) as count
FROM queries
WHERE additional_settings LIKE '%voice_interaction%'
OR session_id LIKE 'voice_%'
GROUP BY retrieval_method
''')
voice_by_method = dict(cursor.fetchall())
# Average response time for voice queries
cursor.execute('''
SELECT AVG(response_time_ms) as avg_response_time
FROM queries
WHERE (additional_settings LIKE '%voice_interaction%'
OR session_id LIKE 'voice_%')
AND response_time_ms IS NOT NULL
''')
result = cursor.fetchone()
avg_response_time = result[0] if result and result[0] else 0
return {
'total_voice_queries': total_voice,
'voice_by_method': voice_by_method,
'avg_voice_response_time': avg_response_time
}
except Exception as e:
logger.error(f"Error getting voice interaction stats: {e}")
return {}
finally:
conn.close()
# Global instance
analytics_db = AnalyticsDB()
# Convenience functions
def log_query(user_query: str, method: str, answer: str, citations: List[Dict],
**kwargs) -> int:
"""Log a query to the analytics database."""
return analytics_db.log_query(user_query, method, answer, citations, **kwargs)
def get_analytics_stats(days: int = 30) -> Dict[str, Any]:
"""Get analytics statistics."""
return analytics_db.get_query_stats(days)
def get_method_performance() -> Dict[str, Dict[str, float]]:
"""Get method performance metrics."""
return analytics_db.get_method_performance()