File size: 9,571 Bytes
c0ef6d4 0194326 e900a8d 0194326 c0ef6d4 28471a4 dc6e56f 2262f42 45df059 c0ef6d4 2262f42 c0ef6d4 0194326 c0ef6d4 45df059 c0ef6d4 0194326 c0ef6d4 196ee96 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 196ee96 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 dac104e c0ef6d4 dac104e 28471a4 dac104e c0ef6d4 dc6e56f c0ef6d4 dac104e 196ee96 a20d863 dac104e 28471a4 dac104e dc6e56f c0ef6d4 0194326 c0ef6d4 0194326 e900a8d a20d863 e900a8d a20d863 0e216c6 a20d863 c0ef6d4 45df059 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 0194326 c0ef6d4 dac104e 0194326 c0ef6d4 dac104e 0194326 c0ef6d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
import json
import time
from typing import Dict, Any, Optional
from core.memory import load_user_state, save_user_state
import logging
from datetime import datetime
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SessionManager:
"""Manages user sessions and conversation context with optimized operations"""
def __init__(self, session_timeout: int = 7200): # Increased from 3600 to 7200 (2 hours)
"""Initialize session manager
Args:
session_timeout: Session timeout in seconds (default: 2 hours)
"""
self.session_timeout = session_timeout
def get_session(self, user_id: str) -> Dict[str, Any]:
"""Retrieve user session data
Args:
user_id: Unique identifier for the user
Returns:
Dictionary containing session data
"""
try:
state = load_user_state(user_id)
if not state:
logger.info(f"Creating new session for user {user_id}")
return self._create_new_session()
# Check if session has expired
last_activity = float(state.get('last_activity', 0))
if time.time() - last_activity > self.session_timeout:
logger.info(f"Session expired for user {user_id}, creating new session")
return self._create_new_session()
# Deserialize complex data types
for key, value in state.items():
if isinstance(value, str):
try:
# Try to parse as JSON for lists/dicts
if value.startswith('[') or value.startswith('{'):
state[key] = json.loads(value)
except (json.JSONDecodeError, TypeError):
# Keep as string if not valid JSON
pass
return state
except Exception as e:
logger.error(f"Error retrieving session for user {user_id}: {e}")
return self._create_new_session()
def update_session(self, user_id: str, data: Dict[str, Any]) -> bool:
"""Update user session data"""
try:
# Get existing session or create new one
session = self.get_session(user_id)
# Update with new data
session.update(data)
session['last_activity'] = time.time() # Always update last activity
# Serialize complex data types for Redis
redis_data = {}
for key, value in session.items():
if isinstance(value, (list, dict)):
redis_data[key] = json.dumps(value, default=str)
elif isinstance(value, (int, float, str, bool)):
redis_data[key] = value
else:
redis_data[key] = str(value)
# Save to Redis using the memory module function
from core.memory import save_user_state
result = save_user_state(user_id, redis_data)
if result:
logger.debug(f"Successfully updated session for user {user_id}")
else:
logger.warning(f"Failed to save session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error updating session for user {user_id}: {e}")
return False
def update_session_with_ai_coordination(self, user_id: str, ai_data: Dict) -> bool:
"""Update session with AI coordination data"""
try:
# Get existing session
session = self.get_session(user_id)
# Add AI coordination tracking
if 'ai_coordination' not in session:
session['ai_coordination'] = {
'requests_processed': 0,
'ollama_responses': 0,
'hf_responses': 0,
'last_coordination': None
}
coord_data = session['ai_coordination']
coord_data['requests_processed'] += 1
coord_data['last_coordination'] = datetime.now().isoformat()
# Track response types
if 'immediate_response' in ai_data:
coord_data['ollama_responses'] += 1
if ai_data.get('hf_response'):
coord_data['hf_responses'] += 1
# Convert complex data to JSON strings for Redis
redis_data = {}
for key, value in session.items():
if isinstance(value, (dict, list)):
redis_data[key] = json.dumps(value, default=str)
else:
redis_data[key] = value
# Save updated session
result = save_user_state(user_id, redis_data)
if result:
logger.debug(f"Successfully updated coordination session for user {user_id}")
else:
logger.warning(f"Failed to save coordination session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error updating coordination session for user {user_id}: {e}")
return False
def update_hierarchical_coordination(self, user_id: str, coordination_data: Dict) -> bool:
"""Update session with hierarchical coordination data"""
try:
# Get existing session
session = self.get_session(user_id)
# Add hierarchical coordination tracking
if 'hierarchical_coordination' not in session:
session['hierarchical_coordination'] = {
'total_conversations': 0,
'hf_engagements': 0,
'ollama_responses': 0,
'coordination_success': 0,
'last_coordination': None
}
coord_stats = session['hierarchical_coordination']
# Update statistics
coord_stats['total_conversations'] += 1
coord_stats['last_coordination'] = datetime.now().isoformat()
# Update specific counters based on coordination data
if coordination_data.get('hf_engaged'):
coord_stats['hf_engagements'] += 1
if coordination_data.get('ollama_responded'):
coord_stats['ollama_responses'] += 1
if coordination_data.get('success'):
coord_stats['coordination_success'] += 1
# Convert complex data to JSON strings for Redis
redis_data = {}
for key, value in session.items():
if isinstance(value, (dict, list)):
redis_data[key] = json.dumps(value, default=str)
else:
redis_data[key] = value
# Save updated session
result = save_user_state(user_id, redis_data)
return result
except Exception as e:
logger.error(f"Error updating hierarchical coordination for user {user_id}: {e}")
return False
def add_hf_expert_to_conversation(self, user_id: str, hf_analysis: str) -> bool:
"""Add HF expert participation to conversation history"""
try:
session = self.get_session(user_id)
# Add HF expert entry to conversation
hf_entry = {
"role": "assistant",
"content": hf_analysis,
"timestamp": datetime.now().isoformat(),
"type": "hf_expert_analysis"
}
conversation = session.get("conversation", [])
conversation.append(hf_entry)
session["conversation"] = conversation
# Update session
return self.update_session(user_id, session)
except Exception as e:
logger.error(f"Error adding HF expert to conversation: {e}")
return False
def get_hierarchical_stats(self, user_id: str) -> Dict:
"""Get hierarchical coordination statistics"""
try:
session = self.get_session(user_id)
return session.get('hierarchical_coordination', {})
except Exception as e:
logger.error(f"Error getting hierarchical stats for user {user_id}: {e}")
return {}
def clear_session(self, user_id: str) -> bool:
"""Clear user session data
Args:
user_id: Unique identifier for the user
Returns:
Boolean indicating success
"""
try:
result = save_user_state(user_id, {})
if result:
logger.info(f"Cleared session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error clearing session for user {user_id}: {e}")
return False
def _create_new_session(self) -> Dict[str, Any]:
"""Create a new session with default values"""
session = {
'conversation': [],
'preferences': {},
'last_activity': time.time(),
'created_at': time.time()
}
logger.info("Created new session")
return session
# Global session manager instance
session_manager = SessionManager()
|