File size: 8,395 Bytes
9da7658 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import time
import logging
from typing import List, Dict, Optional, Union
from src.llm.base_provider import LLMProvider
from src.llm.hf_provider import HuggingFaceProvider
from src.llm.ollama_provider import OllamaProvider
from utils.config import config
logger = logging.getLogger(__name__)
class CoordinatedProvider(LLMProvider):
"""Coordinated provider that orchestrates Ollama and HF interaction"""
def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 2):
super().__init__(model_name, timeout, max_retries)
self.hf_provider = None
self.ollama_provider = None
# Initialize providers
try:
if config.hf_token:
self.hf_provider = HuggingFaceProvider(
model_name="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf",
timeout=120
)
except Exception as e:
logger.warning(f"Failed to initialize HF provider: {e}")
try:
if config.ollama_host:
self.ollama_provider = OllamaProvider(
model_name=config.local_model_name,
timeout=60
)
except Exception as e:
logger.warning(f"Failed to initialize Ollama provider: {e}")
def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
"""Generate coordinated response using HF+Ollama"""
try:
return self._retry_with_backoff(self._generate_coordinated_response, prompt, conversation_history)
except Exception as e:
logger.error(f"Coordinated generation failed: {e}")
raise
def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]:
"""Generate coordinated response with streaming support"""
try:
return self._retry_with_backoff(self._stream_generate_coordinated_response, prompt, conversation_history)
except Exception as e:
logger.error(f"Coordinated stream generation failed: {e}")
raise
def _generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> str:
"""Main method that orchestrates both providers"""
try:
# Step 1: Get response from HF Endpoint (primary expert)
hf_response = self._get_hf_response(prompt, conversation_history)
# Step 2: Get Ollama commentary on HF response
ollama_commentary = self._get_ollama_commentary(prompt, hf_response, conversation_history)
# Step 3: Combine responses with clear formatting
coordinated_response = self._format_coordinated_response(hf_response, ollama_commentary)
return coordinated_response
except Exception as e:
logger.error(f"Coordinated response generation failed: {e}")
# Fallback to Ollama only
if self.ollama_provider:
try:
ollama_response = self.ollama_provider.generate(prompt, conversation_history)
# Generate self-commentary
self_commentary = self._get_ollama_self_commentary(prompt, ollama_response, conversation_history)
return self._format_fallback_response(ollama_response, self_commentary)
except Exception as fallback_error:
logger.error(f"Ollama fallback also failed: {fallback_error}")
raise Exception(f"Both HF Endpoint and Ollama failed: {str(e)}")
def _stream_generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> List[str]:
"""Implementation of streaming coordinated generation"""
# For simplicity, we'll return the full response as chunks
full_response = self._generate_coordinated_response(prompt, conversation_history)
return [full_response]
def _get_hf_response(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
"""Get response from HF Endpoint (primary expert)"""
if not self.hf_provider:
return None
try:
logger.info("π Getting detailed response from HF Endpoint (primary expert)...")
response = self.hf_provider.generate(prompt, conversation_history)
logger.info("β
HF Endpoint expert response received")
return response
except Exception as e:
logger.error(f"HF Endpoint expert failed: {e}")
return None
def _get_ollama_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]:
"""Get Ollama commentary on HF response"""
if not self.ollama_provider:
return None
try:
logger.info("π± Getting Ollama commentary on HF response...")
commentary = self.ollama_provider.generate_commentary(user_prompt, hf_response, conversation_history)
logger.info("β
Ollama commentary received")
return commentary
except Exception as e:
logger.warning(f"Ollama commentary failed: {e}")
return None
def _get_ollama_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]:
"""Get Ollama self-commentary when HF fails"""
if not self.ollama_provider:
return None
try:
logger.info("π± Getting Ollama self-commentary on own response...")
commentary = self.ollama_provider.generate_self_commentary(user_prompt, ollama_response, conversation_history)
logger.info("β
Ollama self-commentary received")
return commentary
except Exception as e:
logger.warning(f"Ollama self-commentary failed: {e}")
return None
def _format_coordinated_response(self, hf_response: str, ollama_commentary: str) -> str:
"""Format combined response with clear separation"""
response_parts = []
# Add HF Expert response
if hf_response:
response_parts.append("π€ HF Expert Analysis\n" + hf_response)
else:
response_parts.append("π€ HF Expert Analysis\n*No response from HF Expert*")
# Add separator
response_parts.append("\n" + "="*50 + "\n")
# Add Ollama Mentor commentary
if ollama_commentary:
response_parts.append("π± Ollama Mentor Commentary\n" + ollama_commentary)
else:
response_parts.append("π± Ollama Mentor Commentary\n*I've reviewed the HF expert's response but couldn't provide additional insights.*")
return "\n\n".join(response_parts)
def _format_fallback_response(self, ollama_response: str, self_commentary: str) -> str:
"""Format fallback response when HF fails"""
response_parts = []
# Add Ollama main response with fallback indication
if ollama_response:
response_parts.append("π¦ Ollama Primary Response (HF Expert Unavailable)\n" + ollama_response)
else:
response_parts.append("π¦ Ollama Primary Response (HF Expert Unavailable)\n*No response generated*")
# Add separator
response_parts.append("\n" + "="*50 + "\n")
# Add Ollama self-commentary
if self_commentary:
response_parts.append("π± Ollama Self-Commentary\n" + self_commentary)
else:
response_parts.append("π± Ollama Self-Commentary\n*I've reviewed my own response but couldn't provide additional insights.*")
return "\n\n".join(response_parts)
def validate_model(self) -> bool:
"""Validate if coordinated system is available"""
# At least one provider must be available
hf_available = self.hf_provider and self.hf_provider.validate_model() if self.hf_provider else False
ollama_available = self.ollama_provider and self.ollama_provider.validate_model() if self.ollama_provider else False
return hf_available or ollama_available
# Global instance
coordinated_provider = CoordinatedProvider("coordinated_model")
|