AI-Life-Coach-Streamlit2 / src /llm /coordinated_provider.py
rdune71's picture
Implement Ollama-HF conversation coordination with expert-commentary approach
9da7658
import time
import logging
from typing import List, Dict, Optional, Union
from src.llm.base_provider import LLMProvider
from src.llm.hf_provider import HuggingFaceProvider
from src.llm.ollama_provider import OllamaProvider
from utils.config import config
logger = logging.getLogger(__name__)
class CoordinatedProvider(LLMProvider):
"""Coordinated provider that orchestrates Ollama and HF interaction"""
def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 2):
super().__init__(model_name, timeout, max_retries)
self.hf_provider = None
self.ollama_provider = None
# Initialize providers
try:
if config.hf_token:
self.hf_provider = HuggingFaceProvider(
model_name="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf",
timeout=120
)
except Exception as e:
logger.warning(f"Failed to initialize HF provider: {e}")
try:
if config.ollama_host:
self.ollama_provider = OllamaProvider(
model_name=config.local_model_name,
timeout=60
)
except Exception as e:
logger.warning(f"Failed to initialize Ollama provider: {e}")
def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
"""Generate coordinated response using HF+Ollama"""
try:
return self._retry_with_backoff(self._generate_coordinated_response, prompt, conversation_history)
except Exception as e:
logger.error(f"Coordinated generation failed: {e}")
raise
def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]:
"""Generate coordinated response with streaming support"""
try:
return self._retry_with_backoff(self._stream_generate_coordinated_response, prompt, conversation_history)
except Exception as e:
logger.error(f"Coordinated stream generation failed: {e}")
raise
def _generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> str:
"""Main method that orchestrates both providers"""
try:
# Step 1: Get response from HF Endpoint (primary expert)
hf_response = self._get_hf_response(prompt, conversation_history)
# Step 2: Get Ollama commentary on HF response
ollama_commentary = self._get_ollama_commentary(prompt, hf_response, conversation_history)
# Step 3: Combine responses with clear formatting
coordinated_response = self._format_coordinated_response(hf_response, ollama_commentary)
return coordinated_response
except Exception as e:
logger.error(f"Coordinated response generation failed: {e}")
# Fallback to Ollama only
if self.ollama_provider:
try:
ollama_response = self.ollama_provider.generate(prompt, conversation_history)
# Generate self-commentary
self_commentary = self._get_ollama_self_commentary(prompt, ollama_response, conversation_history)
return self._format_fallback_response(ollama_response, self_commentary)
except Exception as fallback_error:
logger.error(f"Ollama fallback also failed: {fallback_error}")
raise Exception(f"Both HF Endpoint and Ollama failed: {str(e)}")
def _stream_generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> List[str]:
"""Implementation of streaming coordinated generation"""
# For simplicity, we'll return the full response as chunks
full_response = self._generate_coordinated_response(prompt, conversation_history)
return [full_response]
def _get_hf_response(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
"""Get response from HF Endpoint (primary expert)"""
if not self.hf_provider:
return None
try:
logger.info("πŸš€ Getting detailed response from HF Endpoint (primary expert)...")
response = self.hf_provider.generate(prompt, conversation_history)
logger.info("βœ… HF Endpoint expert response received")
return response
except Exception as e:
logger.error(f"HF Endpoint expert failed: {e}")
return None
def _get_ollama_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]:
"""Get Ollama commentary on HF response"""
if not self.ollama_provider:
return None
try:
logger.info("🐱 Getting Ollama commentary on HF response...")
commentary = self.ollama_provider.generate_commentary(user_prompt, hf_response, conversation_history)
logger.info("βœ… Ollama commentary received")
return commentary
except Exception as e:
logger.warning(f"Ollama commentary failed: {e}")
return None
def _get_ollama_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]:
"""Get Ollama self-commentary when HF fails"""
if not self.ollama_provider:
return None
try:
logger.info("🐱 Getting Ollama self-commentary on own response...")
commentary = self.ollama_provider.generate_self_commentary(user_prompt, ollama_response, conversation_history)
logger.info("βœ… Ollama self-commentary received")
return commentary
except Exception as e:
logger.warning(f"Ollama self-commentary failed: {e}")
return None
def _format_coordinated_response(self, hf_response: str, ollama_commentary: str) -> str:
"""Format combined response with clear separation"""
response_parts = []
# Add HF Expert response
if hf_response:
response_parts.append("πŸ€– HF Expert Analysis\n" + hf_response)
else:
response_parts.append("πŸ€– HF Expert Analysis\n*No response from HF Expert*")
# Add separator
response_parts.append("\n" + "="*50 + "\n")
# Add Ollama Mentor commentary
if ollama_commentary:
response_parts.append("🐱 Ollama Mentor Commentary\n" + ollama_commentary)
else:
response_parts.append("🐱 Ollama Mentor Commentary\n*I've reviewed the HF expert's response but couldn't provide additional insights.*")
return "\n\n".join(response_parts)
def _format_fallback_response(self, ollama_response: str, self_commentary: str) -> str:
"""Format fallback response when HF fails"""
response_parts = []
# Add Ollama main response with fallback indication
if ollama_response:
response_parts.append("πŸ¦™ Ollama Primary Response (HF Expert Unavailable)\n" + ollama_response)
else:
response_parts.append("πŸ¦™ Ollama Primary Response (HF Expert Unavailable)\n*No response generated*")
# Add separator
response_parts.append("\n" + "="*50 + "\n")
# Add Ollama self-commentary
if self_commentary:
response_parts.append("🐱 Ollama Self-Commentary\n" + self_commentary)
else:
response_parts.append("🐱 Ollama Self-Commentary\n*I've reviewed my own response but couldn't provide additional insights.*")
return "\n\n".join(response_parts)
def validate_model(self) -> bool:
"""Validate if coordinated system is available"""
# At least one provider must be available
hf_available = self.hf_provider and self.hf_provider.validate_model() if self.hf_provider else False
ollama_available = self.ollama_provider and self.ollama_provider.validate_model() if self.ollama_provider else False
return hf_available or ollama_available
# Global instance
coordinated_provider = CoordinatedProvider("coordinated_model")