File size: 7,153 Bytes
fc3fdb8 8da6264 dc6e56f fc3fdb8 dc6e56f fc3fdb8 8da6264 fc3fdb8 8da6264 dc6e56f fc3fdb8 dc6e56f fc3fdb8 8da6264 dc6e56f fc3fdb8 dc6e56f fc3fdb8 dc6e56f fc3fdb8 dc6e56f fc3fdb8 8da6264 fc3fdb8 8da6264 dc6e56f fc3fdb8 dc6e56f fc3fdb8 dc6e56f fc3fdb8 8da6264 fc3fdb8 8da6264 dc6e56f fc3fdb8 8da6264 fc3fdb8 dc6e56f fc3fdb8 8da6264 fc3fdb8 8da6264 dc6e56f fc3fdb8 8da6264 fc3fdb8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
import time
import json
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
from core.redis_client import redis_client
from src.analytics.user_logger import user_logger
logger = logging.getLogger(__name__)
class SessionAnalytics:
"""Session-level tracking and analytics with proper error handling"""
def __init__(self):
self.redis_client = redis_client.get_client()
def start_session_tracking(self, user_id: str, session_id: str):
"""Start tracking a user session with error handling"""
try:
if not self.redis_client:
logger.warning("Redis client not available for session tracking")
return
session_data = {
"user_id": user_id,
"session_id": session_id,
"start_time": datetime.now().isoformat(),
"interactions": [],
"duration": 0,
"page_views": 0,
"actions": {}
}
key = f"analytics:sessions:{session_id}"
self.redis_client.setex(key, 86400, json.dumps(session_data)) # 24 hours expiry
# Log session start
user_logger.log_user_action(user_id, "session_start", {
"session_id": session_id
})
logger.info(f"Started session tracking: {session_id}")
except Exception as e:
logger.error(f"Failed to start session tracking: {e}")
def track_interaction(self, user_id: str, session_id: str, interaction_type: str,
details: Dict[str, Any] = None):
"""Track a user interaction within a session with error handling and debouncing"""
try:
if not self.redis_client:
logger.warning("Redis client not available for interaction tracking")
return
# Debounce repeated events - only log if more than 1 second since last event of same type
last_event_key = f"analytics:last_event:{user_id}:{interaction_type}"
last_event_time = self.redis_client.get(last_event_key)
if last_event_time:
if time.time() - float(last_event_time) < 1.0: # Throttle to once per second
return
# Update last event time
self.redis_client.setex(last_event_key, 10, time.time()) # Expire in 10 seconds
key = f"analytics:sessions:{session_id}"
session_data_str = self.redis_client.get(key)
if session_data_str:
session_data = json.loads(session_data_str)
# Add interaction
interaction = {
"type": interaction_type,
"timestamp": datetime.now().isoformat(),
"details": details or {}
}
session_data["interactions"].append(interaction)
# Update action counts
session_data["actions"][interaction_type] = session_data["actions"].get(interaction_type, 0) + 1
# Update page views for navigation events
if interaction_type == "page_view":
session_data["page_views"] += 1
# Update duration
start_time = datetime.fromisoformat(session_data["start_time"])
session_data["duration"] = (datetime.now() - start_time).total_seconds()
# Save updated session data
self.redis_client.setex(key, 86400, json.dumps(session_data))
# Log the interaction
user_logger.log_user_action(user_id, f"session_interaction_{interaction_type}", {
"session_id": session_id,
"details": details or {}
})
except Exception as e:
logger.error(f"Failed to track interaction: {e}")
def end_session_tracking(self, user_id: str, session_id: str):
"""End session tracking and generate summary with error handling"""
try:
if not self.redis_client:
logger.warning("Redis client not available for session ending")
return
key = f"analytics:sessions:{session_id}"
session_data_str = self.redis_client.get(key)
if session_data_str:
session_data = json.loads(session_data_str)
# Update final duration
start_time = datetime.fromisoformat(session_data["start_time"])
session_data["end_time"] = datetime.now().isoformat()
session_data["duration"] = (datetime.now() - start_time).total_seconds()
# Save final session data
self.redis_client.setex(key, 2592000, json.dumps(session_data)) # 30 days expiry for completed sessions
# Log session end with summary
user_logger.log_user_action(user_id, "session_end", {
"session_id": session_id,
"duration": session_data["duration"],
"interactions": len(session_data["interactions"]),
"page_views": session_data["page_views"],
"actions": session_data["actions"]
})
logger.info(f"Ended session tracking: {session_id}")
except Exception as e:
logger.error(f"Failed to end session tracking: {e}")
def get_session_summary(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get session summary data with error handling"""
try:
if not self.redis_client:
logger.warning("Redis client not available for session summary")
return None
key = f"analytics:sessions:{session_id}"
session_data_str = self.redis_client.get(key)
if session_data_str:
return json.loads(session_data_str)
return None
except Exception as e:
logger.error(f"Failed to get session summary for user {session_id}: {e}")
return None
def get_user_sessions(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent sessions for a user with error handling"""
try:
if not self.redis_client:
logger.warning("Redis client not available for user sessions")
return []
# This would require a more complex indexing system
# For now, we'll return an empty list as this requires additional implementation
return []
except Exception as e:
logger.error(f"Failed to get user sessions for user {user_id}: {e}")
return []
# Global instance
session_analytics = SessionAnalytics()
|