File size: 3,334 Bytes
9da7658 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import time
import logging
from typing import List, Dict, Optional
from src.llm.coordinated_provider import coordinated_provider
from core.session import session_manager
logger = logging.getLogger(__name__)
class ConversationCoordinator:
"""Service for managing conversation flow between HF and Ollama"""
def __init__(self):
self.conversation_state = {}
def coordinate_conversation(self, user_id: str, user_query: str) -> str:
"""Main coordination method"""
try:
# Get conversation history
user_session = session_manager.get_session(user_id)
conversation_history = user_session.get("conversation", []).copy()
# Add current user message
conversation_history.append({"role": "user", "content": user_query})
# Generate coordinated response
response = coordinated_provider.generate(user_query, conversation_history)
# Update session with conversation
if response:
conversation = user_session.get("conversation", []).copy()
conversation.append({"role": "user", "content": user_query})
conversation.append({"role": "assistant", "content": response})
session_manager.update_session(user_id, {"conversation": conversation})
return response or "I'm processing your request..."
except Exception as e:
logger.error(f"Conversation coordination failed: {e}")
raise
def track_conversation_state(self, user_id: str, state: str, details: Dict = None):
"""Track conversation state between providers"""
try:
if user_id not in self.conversation_state:
self.conversation_state[user_id] = []
state_entry = {
"state": state,
"timestamp": time.time(),
"details": details or {}
}
self.conversation_state[user_id].append(state_entry)
# Keep only last 50 state entries
if len(self.conversation_state[user_id]) > 50:
self.conversation_state[user_id] = self.conversation_state[user_id][-50:]
except Exception as e:
logger.warning(f"Failed to track conversation state: {e}")
def format_coordinated_response(self, hf_response: str, ollama_commentary: str) -> str:
"""Format combined response with clear separation"""
return coordinated_provider._format_coordinated_response(hf_response, ollama_commentary)
def handle_error_state(self, user_id: str, error: Exception) -> str:
"""Handle error conditions gracefully"""
try:
error_msg = str(error)
logger.error(f"Conversation coordinator error for user {user_id}: {error_msg}")
# Return user-friendly error message
return f"Sorry, I encountered an error while processing your request: {error_msg[:100]}..."
except Exception as e:
logger.error(f"Error handling failed: {e}")
return "Sorry, I encountered an unexpected error."
# Global instance
conversation_coordinator = ConversationCoordinator()
|