|
import time |
|
import json |
|
import logging |
|
from typing import Dict, List, Any, Optional |
|
from datetime import datetime |
|
from core.redis_client import redis_client |
|
from src.analytics.user_logger import user_logger |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class SessionAnalytics: |
|
"""Session-level tracking and analytics with proper error handling""" |
|
|
|
def __init__(self): |
|
self.redis_client = redis_client.get_client() |
|
|
|
def start_session_tracking(self, user_id: str, session_id: str): |
|
"""Start tracking a user session with error handling""" |
|
try: |
|
if not self.redis_client: |
|
logger.warning("Redis client not available for session tracking") |
|
return |
|
|
|
session_data = { |
|
"user_id": user_id, |
|
"session_id": session_id, |
|
"start_time": datetime.now().isoformat(), |
|
"interactions": [], |
|
"duration": 0, |
|
"page_views": 0, |
|
"actions": {} |
|
} |
|
|
|
key = f"analytics:sessions:{session_id}" |
|
self.redis_client.setex(key, 86400, json.dumps(session_data)) |
|
|
|
|
|
user_logger.log_user_action(user_id, "session_start", { |
|
"session_id": session_id |
|
}) |
|
|
|
logger.info(f"Started session tracking: {session_id}") |
|
except Exception as e: |
|
logger.error(f"Failed to start session tracking: {e}") |
|
|
|
def track_interaction(self, user_id: str, session_id: str, interaction_type: str, |
|
details: Dict[str, Any] = None): |
|
"""Track a user interaction within a session with error handling and debouncing""" |
|
try: |
|
if not self.redis_client: |
|
logger.warning("Redis client not available for interaction tracking") |
|
return |
|
|
|
|
|
last_event_key = f"analytics:last_event:{user_id}:{interaction_type}" |
|
last_event_time = self.redis_client.get(last_event_key) |
|
|
|
if last_event_time: |
|
if time.time() - float(last_event_time) < 1.0: |
|
return |
|
|
|
|
|
self.redis_client.setex(last_event_key, 10, time.time()) |
|
|
|
key = f"analytics:sessions:{session_id}" |
|
session_data_str = self.redis_client.get(key) |
|
|
|
if session_data_str: |
|
session_data = json.loads(session_data_str) |
|
|
|
|
|
interaction = { |
|
"type": interaction_type, |
|
"timestamp": datetime.now().isoformat(), |
|
"details": details or {} |
|
} |
|
session_data["interactions"].append(interaction) |
|
|
|
|
|
session_data["actions"][interaction_type] = session_data["actions"].get(interaction_type, 0) + 1 |
|
|
|
|
|
if interaction_type == "page_view": |
|
session_data["page_views"] += 1 |
|
|
|
|
|
start_time = datetime.fromisoformat(session_data["start_time"]) |
|
session_data["duration"] = (datetime.now() - start_time).total_seconds() |
|
|
|
|
|
self.redis_client.setex(key, 86400, json.dumps(session_data)) |
|
|
|
|
|
user_logger.log_user_action(user_id, f"session_interaction_{interaction_type}", { |
|
"session_id": session_id, |
|
"details": details or {} |
|
}) |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to track interaction: {e}") |
|
|
|
def end_session_tracking(self, user_id: str, session_id: str): |
|
"""End session tracking and generate summary with error handling""" |
|
try: |
|
if not self.redis_client: |
|
logger.warning("Redis client not available for session ending") |
|
return |
|
|
|
key = f"analytics:sessions:{session_id}" |
|
session_data_str = self.redis_client.get(key) |
|
|
|
if session_data_str: |
|
session_data = json.loads(session_data_str) |
|
|
|
|
|
start_time = datetime.fromisoformat(session_data["start_time"]) |
|
session_data["end_time"] = datetime.now().isoformat() |
|
session_data["duration"] = (datetime.now() - start_time).total_seconds() |
|
|
|
|
|
self.redis_client.setex(key, 2592000, json.dumps(session_data)) |
|
|
|
|
|
user_logger.log_user_action(user_id, "session_end", { |
|
"session_id": session_id, |
|
"duration": session_data["duration"], |
|
"interactions": len(session_data["interactions"]), |
|
"page_views": session_data["page_views"], |
|
"actions": session_data["actions"] |
|
}) |
|
|
|
logger.info(f"Ended session tracking: {session_id}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to end session tracking: {e}") |
|
|
|
def get_session_summary(self, session_id: str) -> Optional[Dict[str, Any]]: |
|
"""Get session summary data with error handling""" |
|
try: |
|
if not self.redis_client: |
|
logger.warning("Redis client not available for session summary") |
|
return None |
|
|
|
key = f"analytics:sessions:{session_id}" |
|
session_data_str = self.redis_client.get(key) |
|
|
|
if session_data_str: |
|
return json.loads(session_data_str) |
|
return None |
|
except Exception as e: |
|
logger.error(f"Failed to get session summary for user {session_id}: {e}") |
|
return None |
|
|
|
def get_user_sessions(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]: |
|
"""Get recent sessions for a user with error handling""" |
|
try: |
|
if not self.redis_client: |
|
logger.warning("Redis client not available for user sessions") |
|
return [] |
|
|
|
|
|
|
|
return [] |
|
except Exception as e: |
|
logger.error(f"Failed to get user sessions for user {user_id}: {e}") |
|
return [] |
|
|
|
|
|
session_analytics = SessionAnalytics() |
|
|