|
import json |
|
import time |
|
from typing import Dict, Any, Optional |
|
from core.memory import load_user_state, save_user_state |
|
import logging |
|
from datetime import datetime |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class SessionManager: |
|
"""Manages user sessions and conversation context with optimized operations""" |
|
|
|
def __init__(self, session_timeout: int = 7200): |
|
"""Initialize session manager |
|
Args: |
|
session_timeout: Session timeout in seconds (default: 2 hours) |
|
""" |
|
self.session_timeout = session_timeout |
|
|
|
def get_session(self, user_id: str) -> Dict[str, Any]: |
|
"""Retrieve user session data |
|
Args: |
|
user_id: Unique identifier for the user |
|
Returns: |
|
Dictionary containing session data |
|
""" |
|
try: |
|
state = load_user_state(user_id) |
|
if not state: |
|
logger.info(f"Creating new session for user {user_id}") |
|
return self._create_new_session() |
|
|
|
|
|
last_activity = float(state.get('last_activity', 0)) |
|
if time.time() - last_activity > self.session_timeout: |
|
logger.info(f"Session expired for user {user_id}, creating new session") |
|
return self._create_new_session() |
|
|
|
|
|
for key, value in state.items(): |
|
if isinstance(value, str): |
|
try: |
|
|
|
if value.startswith('[') or value.startswith('{'): |
|
state[key] = json.loads(value) |
|
except (json.JSONDecodeError, TypeError): |
|
|
|
pass |
|
|
|
return state |
|
except Exception as e: |
|
logger.error(f"Error retrieving session for user {user_id}: {e}") |
|
return self._create_new_session() |
|
|
|
def update_session(self, user_id: str, data: Dict[str, Any]) -> bool: |
|
"""Update user session data""" |
|
try: |
|
|
|
session = self.get_session(user_id) |
|
|
|
|
|
session.update(data) |
|
session['last_activity'] = time.time() |
|
|
|
|
|
redis_data = {} |
|
for key, value in session.items(): |
|
if isinstance(value, (list, dict)): |
|
redis_data[key] = json.dumps(value, default=str) |
|
elif isinstance(value, (int, float, str, bool)): |
|
redis_data[key] = value |
|
else: |
|
redis_data[key] = str(value) |
|
|
|
|
|
from core.memory import save_user_state |
|
result = save_user_state(user_id, redis_data) |
|
|
|
if result: |
|
logger.debug(f"Successfully updated session for user {user_id}") |
|
else: |
|
logger.warning(f"Failed to save session for user {user_id}") |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Error updating session for user {user_id}: {e}") |
|
return False |
|
|
|
def update_session_with_ai_coordination(self, user_id: str, ai_data: Dict) -> bool: |
|
"""Update session with AI coordination data""" |
|
try: |
|
|
|
session = self.get_session(user_id) |
|
|
|
|
|
if 'ai_coordination' not in session: |
|
session['ai_coordination'] = { |
|
'requests_processed': 0, |
|
'ollama_responses': 0, |
|
'hf_responses': 0, |
|
'last_coordination': None |
|
} |
|
|
|
coord_data = session['ai_coordination'] |
|
coord_data['requests_processed'] += 1 |
|
coord_data['last_coordination'] = datetime.now().isoformat() |
|
|
|
|
|
if 'immediate_response' in ai_data: |
|
coord_data['ollama_responses'] += 1 |
|
if ai_data.get('hf_response'): |
|
coord_data['hf_responses'] += 1 |
|
|
|
|
|
redis_data = {} |
|
for key, value in session.items(): |
|
if isinstance(value, (dict, list)): |
|
redis_data[key] = json.dumps(value, default=str) |
|
else: |
|
redis_data[key] = value |
|
|
|
|
|
result = save_user_state(user_id, redis_data) |
|
if result: |
|
logger.debug(f"Successfully updated coordination session for user {user_id}") |
|
else: |
|
logger.warning(f"Failed to save coordination session for user {user_id}") |
|
|
|
return result |
|
except Exception as e: |
|
logger.error(f"Error updating coordination session for user {user_id}: {e}") |
|
return False |
|
|
|
def update_hierarchical_coordination(self, user_id: str, coordination_data: Dict) -> bool: |
|
"""Update session with hierarchical coordination data""" |
|
try: |
|
|
|
session = self.get_session(user_id) |
|
|
|
|
|
if 'hierarchical_coordination' not in session: |
|
session['hierarchical_coordination'] = { |
|
'total_conversations': 0, |
|
'hf_engagements': 0, |
|
'ollama_responses': 0, |
|
'coordination_success': 0, |
|
'last_coordination': None |
|
} |
|
|
|
coord_stats = session['hierarchical_coordination'] |
|
|
|
|
|
coord_stats['total_conversations'] += 1 |
|
coord_stats['last_coordination'] = datetime.now().isoformat() |
|
|
|
|
|
if coordination_data.get('hf_engaged'): |
|
coord_stats['hf_engagements'] += 1 |
|
if coordination_data.get('ollama_responded'): |
|
coord_stats['ollama_responses'] += 1 |
|
if coordination_data.get('success'): |
|
coord_stats['coordination_success'] += 1 |
|
|
|
|
|
redis_data = {} |
|
for key, value in session.items(): |
|
if isinstance(value, (dict, list)): |
|
redis_data[key] = json.dumps(value, default=str) |
|
else: |
|
redis_data[key] = value |
|
|
|
|
|
result = save_user_state(user_id, redis_data) |
|
return result |
|
except Exception as e: |
|
logger.error(f"Error updating hierarchical coordination for user {user_id}: {e}") |
|
return False |
|
|
|
def add_hf_expert_to_conversation(self, user_id: str, hf_analysis: str) -> bool: |
|
"""Add HF expert participation to conversation history""" |
|
try: |
|
session = self.get_session(user_id) |
|
|
|
|
|
hf_entry = { |
|
"role": "assistant", |
|
"content": hf_analysis, |
|
"timestamp": datetime.now().isoformat(), |
|
"type": "hf_expert_analysis" |
|
} |
|
|
|
conversation = session.get("conversation", []) |
|
conversation.append(hf_entry) |
|
session["conversation"] = conversation |
|
|
|
|
|
return self.update_session(user_id, session) |
|
|
|
except Exception as e: |
|
logger.error(f"Error adding HF expert to conversation: {e}") |
|
return False |
|
|
|
def get_hierarchical_stats(self, user_id: str) -> Dict: |
|
"""Get hierarchical coordination statistics""" |
|
try: |
|
session = self.get_session(user_id) |
|
return session.get('hierarchical_coordination', {}) |
|
except Exception as e: |
|
logger.error(f"Error getting hierarchical stats for user {user_id}: {e}") |
|
return {} |
|
|
|
def clear_session(self, user_id: str) -> bool: |
|
"""Clear user session data |
|
Args: |
|
user_id: Unique identifier for the user |
|
Returns: |
|
Boolean indicating success |
|
""" |
|
try: |
|
result = save_user_state(user_id, {}) |
|
if result: |
|
logger.info(f"Cleared session for user {user_id}") |
|
return result |
|
except Exception as e: |
|
logger.error(f"Error clearing session for user {user_id}: {e}") |
|
return False |
|
|
|
def _create_new_session(self) -> Dict[str, Any]: |
|
"""Create a new session with default values""" |
|
session = { |
|
'conversation': [], |
|
'preferences': {}, |
|
'last_activity': time.time(), |
|
'created_at': time.time() |
|
} |
|
logger.info("Created new session") |
|
return session |
|
|
|
|
|
session_manager = SessionManager() |
|
|