|
import time |
|
import logging |
|
from typing import List, Dict, Optional |
|
from src.llm.coordinated_provider import coordinated_provider |
|
from core.session import session_manager |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class ConversationCoordinator: |
|
"""Service for managing conversation flow between HF and Ollama""" |
|
|
|
def __init__(self): |
|
self.conversation_state = {} |
|
|
|
def coordinate_conversation(self, user_id: str, user_query: str) -> str: |
|
"""Main coordination method""" |
|
try: |
|
|
|
user_session = session_manager.get_session(user_id) |
|
conversation_history = user_session.get("conversation", []).copy() |
|
|
|
|
|
conversation_history.append({"role": "user", "content": user_query}) |
|
|
|
|
|
response = coordinated_provider.generate(user_query, conversation_history) |
|
|
|
|
|
if response: |
|
conversation = user_session.get("conversation", []).copy() |
|
conversation.append({"role": "user", "content": user_query}) |
|
conversation.append({"role": "assistant", "content": response}) |
|
session_manager.update_session(user_id, {"conversation": conversation}) |
|
|
|
return response or "I'm processing your request..." |
|
|
|
except Exception as e: |
|
logger.error(f"Conversation coordination failed: {e}") |
|
raise |
|
|
|
def track_conversation_state(self, user_id: str, state: str, details: Dict = None): |
|
"""Track conversation state between providers""" |
|
try: |
|
if user_id not in self.conversation_state: |
|
self.conversation_state[user_id] = [] |
|
|
|
state_entry = { |
|
"state": state, |
|
"timestamp": time.time(), |
|
"details": details or {} |
|
} |
|
self.conversation_state[user_id].append(state_entry) |
|
|
|
|
|
if len(self.conversation_state[user_id]) > 50: |
|
self.conversation_state[user_id] = self.conversation_state[user_id][-50:] |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to track conversation state: {e}") |
|
|
|
def format_coordinated_response(self, hf_response: str, ollama_commentary: str) -> str: |
|
"""Format combined response with clear separation""" |
|
return coordinated_provider._format_coordinated_response(hf_response, ollama_commentary) |
|
|
|
def handle_error_state(self, user_id: str, error: Exception) -> str: |
|
"""Handle error conditions gracefully""" |
|
try: |
|
error_msg = str(error) |
|
logger.error(f"Conversation coordinator error for user {user_id}: {error_msg}") |
|
|
|
|
|
return f"Sorry, I encountered an error while processing your request: {error_msg[:100]}..." |
|
|
|
except Exception as e: |
|
logger.error(f"Error handling failed: {e}") |
|
return "Sorry, I encountered an unexpected error." |
|
|
|
|
|
conversation_coordinator = ConversationCoordinator() |
|
|