import json import time from typing import Dict, Any, Optional from core.memory import load_user_state, save_user_state import logging from datetime import datetime # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SessionManager: """Manages user sessions and conversation context""" def __init__(self, session_timeout: int = 3600): """Initialize session manager Args: session_timeout: Session timeout in seconds (default: 1 hour) """ self.session_timeout = session_timeout def get_session(self, user_id: str) -> Dict[str, Any]: """Retrieve user session data Args: user_id: Unique identifier for the user Returns: Dictionary containing session data """ try: state = load_user_state(user_id) if not state: logger.info(f"Creating new session for user {user_id}") return self._create_new_session() # Check if session has expired last_activity = float(state.get('last_activity', 0)) if time.time() - last_activity > self.session_timeout: logger.info(f"Session expired for user {user_id}, creating new session") return self._create_new_session() # Deserialize complex data types for key, value in state.items(): if isinstance(value, str): try: # Try to parse as JSON for lists/dicts if value.startswith('[') or value.startswith('{'): state[key] = json.loads(value) except (json.JSONDecodeError, TypeError): # Keep as string if not valid JSON pass return state except Exception as e: logger.error(f"Error retrieving session for user {user_id}: {e}") return self._create_new_session() def update_session(self, user_id: str, data: Dict[str, Any]) -> bool: """Update user session data Args: user_id: Unique identifier for the user data: Data to update in the session Returns: Boolean indicating success """ try: # Get existing session session = self.get_session(user_id) # Update with new data session.update(data) session['last_activity'] = time.time() # Convert complex data types to strings for Redis redis_data = {} for key, value in session.items(): if isinstance(value, (list, dict)): redis_data[key] = json.dumps(value, default=str) elif isinstance(value, (int, float, str, bool)): redis_data[key] = value else: redis_data[key] = str(value) # Save updated session result = save_user_state(user_id, redis_data) if result: logger.debug(f"Successfully updated session for user {user_id}") else: logger.warning(f"Failed to save session for user {user_id}") return result except Exception as e: logger.error(f"Error updating session for user {user_id}: {e}") return False def update_session_with_ai_coordination(self, user_id: str, ai_data: Dict) -> bool: """Update session with AI coordination data""" try: # Get existing session session = self.get_session(user_id) # Add AI coordination tracking if 'ai_coordination' not in session: session['ai_coordination'] = { 'requests_processed': 0, 'ollama_responses': 0, 'hf_responses': 0, 'last_coordination': None } coord_data = session['ai_coordination'] coord_data['requests_processed'] += 1 coord_data['last_coordination'] = datetime.now().isoformat() # Track response types if 'immediate_response' in ai_data: coord_data['ollama_responses'] += 1 if ai_data.get('hf_response'): coord_data['hf_responses'] += 1 # Convert complex data to JSON strings for Redis redis_data = {} for key, value in session.items(): if isinstance(value, (dict, list)): redis_data[key] = json.dumps(value, default=str) else: redis_data[key] = value # Save updated session result = save_user_state(user_id, redis_data) if result: logger.debug(f"Successfully updated coordination session for user {user_id}") else: logger.warning(f"Failed to save coordination session for user {user_id}") return result except Exception as e: logger.error(f"Error updating coordination session for user {user_id}: {e}") return False def update_hierarchical_coordination(self, user_id: str, coordination_data: Dict) -> bool: """Update session with hierarchical coordination data""" try: # Get existing session session = self.get_session(user_id) # Add hierarchical coordination tracking if 'hierarchical_coordination' not in session: session['hierarchical_coordination'] = { 'total_conversations': 0, 'hf_engagements': 0, 'ollama_responses': 0, 'coordination_success': 0, 'last_coordination': None } coord_stats = session['hierarchical_coordination'] # Update statistics coord_stats['total_conversations'] += 1 coord_stats['last_coordination'] = datetime.now().isoformat() # Update specific counters based on coordination data if coordination_data.get('hf_engaged'): coord_stats['hf_engagements'] += 1 if coordination_data.get('ollama_responded'): coord_stats['ollama_responses'] += 1 if coordination_data.get('success'): coord_stats['coordination_success'] += 1 # Convert complex data to JSON strings for Redis redis_data = {} for key, value in session.items(): if isinstance(value, (dict, list)): redis_data[key] = json.dumps(value, default=str) else: redis_data[key] = value # Save updated session result = save_user_state(user_id, redis_data) return result except Exception as e: logger.error(f"Error updating hierarchical coordination for user {user_id}: {e}") return False def add_hf_expert_to_conversation(self, user_id: str, hf_analysis: str) -> bool: """Add HF expert participation to conversation history""" try: session = self.get_session(user_id) # Add HF expert entry to conversation hf_entry = { "role": "assistant", "content": hf_analysis, "timestamp": datetime.now().isoformat(), "type": "hf_expert_analysis" } conversation = session.get("conversation", []) conversation.append(hf_entry) session["conversation"] = conversation # Update session return self.update_session(user_id, session) except Exception as e: logger.error(f"Error adding HF expert to conversation: {e}") return False def get_hierarchical_stats(self, user_id: str) -> Dict: """Get hierarchical coordination statistics""" try: session = self.get_session(user_id) return session.get('hierarchical_coordination', {}) except Exception as e: logger.error(f"Error getting hierarchical stats for user {user_id}: {e}") return {} def clear_session(self, user_id: str) -> bool: """Clear user session data Args: user_id: Unique identifier for the user Returns: Boolean indicating success """ try: result = save_user_state(user_id, {}) if result: logger.info(f"Cleared session for user {user_id}") return result except Exception as e: logger.error(f"Error clearing session for user {user_id}: {e}") return False def _create_new_session(self) -> Dict[str, Any]: """Create a new session with default values Returns: Dictionary containing new session data """ session = { 'conversation': [], 'preferences': {}, 'last_activity': time.time(), 'created_at': time.time() } logger.debug("Created new session") return session # Global session manager instance session_manager = SessionManager()