AI-Life-Coach-Streamlit2 / src /analytics /session_analytics.py
rdune71's picture
Fix critical issues: session state error, Ollama timeouts, redundant logging
dc6e56f
import time
import json
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
from core.redis_client import redis_client
from src.analytics.user_logger import user_logger
logger = logging.getLogger(__name__)
class SessionAnalytics:
"""Session-level tracking and analytics with proper error handling"""
def __init__(self):
self.redis_client = redis_client.get_client()
def start_session_tracking(self, user_id: str, session_id: str):
"""Start tracking a user session with error handling"""
try:
if not self.redis_client:
logger.warning("Redis client not available for session tracking")
return
session_data = {
"user_id": user_id,
"session_id": session_id,
"start_time": datetime.now().isoformat(),
"interactions": [],
"duration": 0,
"page_views": 0,
"actions": {}
}
key = f"analytics:sessions:{session_id}"
self.redis_client.setex(key, 86400, json.dumps(session_data)) # 24 hours expiry
# Log session start
user_logger.log_user_action(user_id, "session_start", {
"session_id": session_id
})
logger.info(f"Started session tracking: {session_id}")
except Exception as e:
logger.error(f"Failed to start session tracking: {e}")
def track_interaction(self, user_id: str, session_id: str, interaction_type: str,
details: Dict[str, Any] = None):
"""Track a user interaction within a session with error handling and debouncing"""
try:
if not self.redis_client:
logger.warning("Redis client not available for interaction tracking")
return
# Debounce repeated events - only log if more than 1 second since last event of same type
last_event_key = f"analytics:last_event:{user_id}:{interaction_type}"
last_event_time = self.redis_client.get(last_event_key)
if last_event_time:
if time.time() - float(last_event_time) < 1.0: # Throttle to once per second
return
# Update last event time
self.redis_client.setex(last_event_key, 10, time.time()) # Expire in 10 seconds
key = f"analytics:sessions:{session_id}"
session_data_str = self.redis_client.get(key)
if session_data_str:
session_data = json.loads(session_data_str)
# Add interaction
interaction = {
"type": interaction_type,
"timestamp": datetime.now().isoformat(),
"details": details or {}
}
session_data["interactions"].append(interaction)
# Update action counts
session_data["actions"][interaction_type] = session_data["actions"].get(interaction_type, 0) + 1
# Update page views for navigation events
if interaction_type == "page_view":
session_data["page_views"] += 1
# Update duration
start_time = datetime.fromisoformat(session_data["start_time"])
session_data["duration"] = (datetime.now() - start_time).total_seconds()
# Save updated session data
self.redis_client.setex(key, 86400, json.dumps(session_data))
# Log the interaction
user_logger.log_user_action(user_id, f"session_interaction_{interaction_type}", {
"session_id": session_id,
"details": details or {}
})
except Exception as e:
logger.error(f"Failed to track interaction: {e}")
def end_session_tracking(self, user_id: str, session_id: str):
"""End session tracking and generate summary with error handling"""
try:
if not self.redis_client:
logger.warning("Redis client not available for session ending")
return
key = f"analytics:sessions:{session_id}"
session_data_str = self.redis_client.get(key)
if session_data_str:
session_data = json.loads(session_data_str)
# Update final duration
start_time = datetime.fromisoformat(session_data["start_time"])
session_data["end_time"] = datetime.now().isoformat()
session_data["duration"] = (datetime.now() - start_time).total_seconds()
# Save final session data
self.redis_client.setex(key, 2592000, json.dumps(session_data)) # 30 days expiry for completed sessions
# Log session end with summary
user_logger.log_user_action(user_id, "session_end", {
"session_id": session_id,
"duration": session_data["duration"],
"interactions": len(session_data["interactions"]),
"page_views": session_data["page_views"],
"actions": session_data["actions"]
})
logger.info(f"Ended session tracking: {session_id}")
except Exception as e:
logger.error(f"Failed to end session tracking: {e}")
def get_session_summary(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get session summary data with error handling"""
try:
if not self.redis_client:
logger.warning("Redis client not available for session summary")
return None
key = f"analytics:sessions:{session_id}"
session_data_str = self.redis_client.get(key)
if session_data_str:
return json.loads(session_data_str)
return None
except Exception as e:
logger.error(f"Failed to get session summary for user {session_id}: {e}")
return None
def get_user_sessions(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent sessions for a user with error handling"""
try:
if not self.redis_client:
logger.warning("Redis client not available for user sessions")
return []
# This would require a more complex indexing system
# For now, we'll return an empty list as this requires additional implementation
return []
except Exception as e:
logger.error(f"Failed to get user sessions for user {user_id}: {e}")
return []
# Global instance
session_analytics = SessionAnalytics()