import time import json import logging from typing import Dict, List, Any, Optional from datetime import datetime from core.redis_client import redis_client from src.analytics.user_logger import user_logger logger = logging.getLogger(__name__) class SessionAnalytics: """Session-level tracking and analytics with proper error handling""" def __init__(self): self.redis_client = redis_client.get_client() def start_session_tracking(self, user_id: str, session_id: str): """Start tracking a user session with error handling""" try: if not self.redis_client: logger.warning("Redis client not available for session tracking") return session_data = { "user_id": user_id, "session_id": session_id, "start_time": datetime.now().isoformat(), "interactions": [], "duration": 0, "page_views": 0, "actions": {} } key = f"analytics:sessions:{session_id}" self.redis_client.setex(key, 86400, json.dumps(session_data)) # 24 hours expiry # Log session start user_logger.log_user_action(user_id, "session_start", { "session_id": session_id }) logger.info(f"Started session tracking: {session_id}") except Exception as e: logger.error(f"Failed to start session tracking: {e}") def track_interaction(self, user_id: str, session_id: str, interaction_type: str, details: Dict[str, Any] = None): """Track a user interaction within a session with error handling and debouncing""" try: if not self.redis_client: logger.warning("Redis client not available for interaction tracking") return # Debounce repeated events - only log if more than 1 second since last event of same type last_event_key = f"analytics:last_event:{user_id}:{interaction_type}" last_event_time = self.redis_client.get(last_event_key) if last_event_time: if time.time() - float(last_event_time) < 1.0: # Throttle to once per second return # Update last event time self.redis_client.setex(last_event_key, 10, time.time()) # Expire in 10 seconds key = f"analytics:sessions:{session_id}" session_data_str = self.redis_client.get(key) if session_data_str: session_data = json.loads(session_data_str) # Add interaction interaction = { "type": interaction_type, "timestamp": datetime.now().isoformat(), "details": details or {} } session_data["interactions"].append(interaction) # Update action counts session_data["actions"][interaction_type] = session_data["actions"].get(interaction_type, 0) + 1 # Update page views for navigation events if interaction_type == "page_view": session_data["page_views"] += 1 # Update duration start_time = datetime.fromisoformat(session_data["start_time"]) session_data["duration"] = (datetime.now() - start_time).total_seconds() # Save updated session data self.redis_client.setex(key, 86400, json.dumps(session_data)) # Log the interaction user_logger.log_user_action(user_id, f"session_interaction_{interaction_type}", { "session_id": session_id, "details": details or {} }) except Exception as e: logger.error(f"Failed to track interaction: {e}") def end_session_tracking(self, user_id: str, session_id: str): """End session tracking and generate summary with error handling""" try: if not self.redis_client: logger.warning("Redis client not available for session ending") return key = f"analytics:sessions:{session_id}" session_data_str = self.redis_client.get(key) if session_data_str: session_data = json.loads(session_data_str) # Update final duration start_time = datetime.fromisoformat(session_data["start_time"]) session_data["end_time"] = datetime.now().isoformat() session_data["duration"] = (datetime.now() - start_time).total_seconds() # Save final session data self.redis_client.setex(key, 2592000, json.dumps(session_data)) # 30 days expiry for completed sessions # Log session end with summary user_logger.log_user_action(user_id, "session_end", { "session_id": session_id, "duration": session_data["duration"], "interactions": len(session_data["interactions"]), "page_views": session_data["page_views"], "actions": session_data["actions"] }) logger.info(f"Ended session tracking: {session_id}") except Exception as e: logger.error(f"Failed to end session tracking: {e}") def get_session_summary(self, session_id: str) -> Optional[Dict[str, Any]]: """Get session summary data with error handling""" try: if not self.redis_client: logger.warning("Redis client not available for session summary") return None key = f"analytics:sessions:{session_id}" session_data_str = self.redis_client.get(key) if session_data_str: return json.loads(session_data_str) return None except Exception as e: logger.error(f"Failed to get session summary for user {session_id}: {e}") return None def get_user_sessions(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]: """Get recent sessions for a user with error handling""" try: if not self.redis_client: logger.warning("Redis client not available for user sessions") return [] # This would require a more complex indexing system # For now, we'll return an empty list as this requires additional implementation return [] except Exception as e: logger.error(f"Failed to get user sessions for user {user_id}: {e}") return [] # Global instance session_analytics = SessionAnalytics()