rdune71's picture
Fix critical issues: session state error, Ollama timeouts, redundant logging
dc6e56f
import json
import time
from typing import Dict, Any, Optional
from core.memory import load_user_state, save_user_state
import logging
from datetime import datetime
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SessionManager:
"""Manages user sessions and conversation context with optimized operations"""
def __init__(self, session_timeout: int = 7200): # Increased from 3600 to 7200 (2 hours)
"""Initialize session manager
Args:
session_timeout: Session timeout in seconds (default: 2 hours)
"""
self.session_timeout = session_timeout
def get_session(self, user_id: str) -> Dict[str, Any]:
"""Retrieve user session data
Args:
user_id: Unique identifier for the user
Returns:
Dictionary containing session data
"""
try:
state = load_user_state(user_id)
if not state:
logger.info(f"Creating new session for user {user_id}")
return self._create_new_session()
# Check if session has expired
last_activity = float(state.get('last_activity', 0))
if time.time() - last_activity > self.session_timeout:
logger.info(f"Session expired for user {user_id}, creating new session")
return self._create_new_session()
# Deserialize complex data types
for key, value in state.items():
if isinstance(value, str):
try:
# Try to parse as JSON for lists/dicts
if value.startswith('[') or value.startswith('{'):
state[key] = json.loads(value)
except (json.JSONDecodeError, TypeError):
# Keep as string if not valid JSON
pass
return state
except Exception as e:
logger.error(f"Error retrieving session for user {user_id}: {e}")
return self._create_new_session()
def update_session(self, user_id: str, data: Dict[str, Any]) -> bool:
"""Update user session data"""
try:
# Get existing session or create new one
session = self.get_session(user_id)
# Update with new data
session.update(data)
session['last_activity'] = time.time() # Always update last activity
# Serialize complex data types for Redis
redis_data = {}
for key, value in session.items():
if isinstance(value, (list, dict)):
redis_data[key] = json.dumps(value, default=str)
elif isinstance(value, (int, float, str, bool)):
redis_data[key] = value
else:
redis_data[key] = str(value)
# Save to Redis using the memory module function
from core.memory import save_user_state
result = save_user_state(user_id, redis_data)
if result:
logger.debug(f"Successfully updated session for user {user_id}")
else:
logger.warning(f"Failed to save session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error updating session for user {user_id}: {e}")
return False
def update_session_with_ai_coordination(self, user_id: str, ai_data: Dict) -> bool:
"""Update session with AI coordination data"""
try:
# Get existing session
session = self.get_session(user_id)
# Add AI coordination tracking
if 'ai_coordination' not in session:
session['ai_coordination'] = {
'requests_processed': 0,
'ollama_responses': 0,
'hf_responses': 0,
'last_coordination': None
}
coord_data = session['ai_coordination']
coord_data['requests_processed'] += 1
coord_data['last_coordination'] = datetime.now().isoformat()
# Track response types
if 'immediate_response' in ai_data:
coord_data['ollama_responses'] += 1
if ai_data.get('hf_response'):
coord_data['hf_responses'] += 1
# Convert complex data to JSON strings for Redis
redis_data = {}
for key, value in session.items():
if isinstance(value, (dict, list)):
redis_data[key] = json.dumps(value, default=str)
else:
redis_data[key] = value
# Save updated session
result = save_user_state(user_id, redis_data)
if result:
logger.debug(f"Successfully updated coordination session for user {user_id}")
else:
logger.warning(f"Failed to save coordination session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error updating coordination session for user {user_id}: {e}")
return False
def update_hierarchical_coordination(self, user_id: str, coordination_data: Dict) -> bool:
"""Update session with hierarchical coordination data"""
try:
# Get existing session
session = self.get_session(user_id)
# Add hierarchical coordination tracking
if 'hierarchical_coordination' not in session:
session['hierarchical_coordination'] = {
'total_conversations': 0,
'hf_engagements': 0,
'ollama_responses': 0,
'coordination_success': 0,
'last_coordination': None
}
coord_stats = session['hierarchical_coordination']
# Update statistics
coord_stats['total_conversations'] += 1
coord_stats['last_coordination'] = datetime.now().isoformat()
# Update specific counters based on coordination data
if coordination_data.get('hf_engaged'):
coord_stats['hf_engagements'] += 1
if coordination_data.get('ollama_responded'):
coord_stats['ollama_responses'] += 1
if coordination_data.get('success'):
coord_stats['coordination_success'] += 1
# Convert complex data to JSON strings for Redis
redis_data = {}
for key, value in session.items():
if isinstance(value, (dict, list)):
redis_data[key] = json.dumps(value, default=str)
else:
redis_data[key] = value
# Save updated session
result = save_user_state(user_id, redis_data)
return result
except Exception as e:
logger.error(f"Error updating hierarchical coordination for user {user_id}: {e}")
return False
def add_hf_expert_to_conversation(self, user_id: str, hf_analysis: str) -> bool:
"""Add HF expert participation to conversation history"""
try:
session = self.get_session(user_id)
# Add HF expert entry to conversation
hf_entry = {
"role": "assistant",
"content": hf_analysis,
"timestamp": datetime.now().isoformat(),
"type": "hf_expert_analysis"
}
conversation = session.get("conversation", [])
conversation.append(hf_entry)
session["conversation"] = conversation
# Update session
return self.update_session(user_id, session)
except Exception as e:
logger.error(f"Error adding HF expert to conversation: {e}")
return False
def get_hierarchical_stats(self, user_id: str) -> Dict:
"""Get hierarchical coordination statistics"""
try:
session = self.get_session(user_id)
return session.get('hierarchical_coordination', {})
except Exception as e:
logger.error(f"Error getting hierarchical stats for user {user_id}: {e}")
return {}
def clear_session(self, user_id: str) -> bool:
"""Clear user session data
Args:
user_id: Unique identifier for the user
Returns:
Boolean indicating success
"""
try:
result = save_user_state(user_id, {})
if result:
logger.info(f"Cleared session for user {user_id}")
return result
except Exception as e:
logger.error(f"Error clearing session for user {user_id}: {e}")
return False
def _create_new_session(self) -> Dict[str, Any]:
"""Create a new session with default values"""
session = {
'conversation': [],
'preferences': {},
'last_activity': time.time(),
'created_at': time.time()
}
logger.info("Created new session")
return session
# Global session manager instance
session_manager = SessionManager()