|
import time |
|
import logging |
|
from typing import List, Dict, Optional, Union |
|
from src.llm.base_provider import LLMProvider |
|
from src.llm.hf_provider import HuggingFaceProvider |
|
from src.llm.ollama_provider import OllamaProvider |
|
from utils.config import config |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class CoordinatedProvider(LLMProvider): |
|
"""Coordinated provider that orchestrates Ollama and HF interaction""" |
|
|
|
def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 2): |
|
super().__init__(model_name, timeout, max_retries) |
|
self.hf_provider = None |
|
self.ollama_provider = None |
|
|
|
|
|
try: |
|
if config.hf_token: |
|
self.hf_provider = HuggingFaceProvider( |
|
model_name="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf", |
|
timeout=120 |
|
) |
|
except Exception as e: |
|
logger.warning(f"Failed to initialize HF provider: {e}") |
|
|
|
try: |
|
if config.ollama_host: |
|
self.ollama_provider = OllamaProvider( |
|
model_name=config.local_model_name, |
|
timeout=60 |
|
) |
|
except Exception as e: |
|
logger.warning(f"Failed to initialize Ollama provider: {e}") |
|
|
|
def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Generate coordinated response using HF+Ollama""" |
|
try: |
|
return self._retry_with_backoff(self._generate_coordinated_response, prompt, conversation_history) |
|
except Exception as e: |
|
logger.error(f"Coordinated generation failed: {e}") |
|
raise |
|
|
|
def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]: |
|
"""Generate coordinated response with streaming support""" |
|
try: |
|
return self._retry_with_backoff(self._stream_generate_coordinated_response, prompt, conversation_history) |
|
except Exception as e: |
|
logger.error(f"Coordinated stream generation failed: {e}") |
|
raise |
|
|
|
def _generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> str: |
|
"""Main method that orchestrates both providers""" |
|
try: |
|
|
|
hf_response = self._get_hf_response(prompt, conversation_history) |
|
|
|
|
|
ollama_commentary = self._get_ollama_commentary(prompt, hf_response, conversation_history) |
|
|
|
|
|
coordinated_response = self._format_coordinated_response(hf_response, ollama_commentary) |
|
|
|
return coordinated_response |
|
|
|
except Exception as e: |
|
logger.error(f"Coordinated response generation failed: {e}") |
|
|
|
if self.ollama_provider: |
|
try: |
|
ollama_response = self.ollama_provider.generate(prompt, conversation_history) |
|
|
|
self_commentary = self._get_ollama_self_commentary(prompt, ollama_response, conversation_history) |
|
return self._format_fallback_response(ollama_response, self_commentary) |
|
except Exception as fallback_error: |
|
logger.error(f"Ollama fallback also failed: {fallback_error}") |
|
|
|
raise Exception(f"Both HF Endpoint and Ollama failed: {str(e)}") |
|
|
|
def _stream_generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> List[str]: |
|
"""Implementation of streaming coordinated generation""" |
|
|
|
full_response = self._generate_coordinated_response(prompt, conversation_history) |
|
return [full_response] |
|
|
|
def _get_hf_response(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Get response from HF Endpoint (primary expert)""" |
|
if not self.hf_provider: |
|
return None |
|
|
|
try: |
|
logger.info("π Getting detailed response from HF Endpoint (primary expert)...") |
|
response = self.hf_provider.generate(prompt, conversation_history) |
|
logger.info("β
HF Endpoint expert response received") |
|
return response |
|
except Exception as e: |
|
logger.error(f"HF Endpoint expert failed: {e}") |
|
return None |
|
|
|
def _get_ollama_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Get Ollama commentary on HF response""" |
|
if not self.ollama_provider: |
|
return None |
|
|
|
try: |
|
logger.info("π± Getting Ollama commentary on HF response...") |
|
commentary = self.ollama_provider.generate_commentary(user_prompt, hf_response, conversation_history) |
|
logger.info("β
Ollama commentary received") |
|
return commentary |
|
except Exception as e: |
|
logger.warning(f"Ollama commentary failed: {e}") |
|
return None |
|
|
|
def _get_ollama_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Get Ollama self-commentary when HF fails""" |
|
if not self.ollama_provider: |
|
return None |
|
|
|
try: |
|
logger.info("π± Getting Ollama self-commentary on own response...") |
|
commentary = self.ollama_provider.generate_self_commentary(user_prompt, ollama_response, conversation_history) |
|
logger.info("β
Ollama self-commentary received") |
|
return commentary |
|
except Exception as e: |
|
logger.warning(f"Ollama self-commentary failed: {e}") |
|
return None |
|
|
|
def _format_coordinated_response(self, hf_response: str, ollama_commentary: str) -> str: |
|
"""Format combined response with clear separation""" |
|
response_parts = [] |
|
|
|
|
|
if hf_response: |
|
response_parts.append("π€ HF Expert Analysis\n" + hf_response) |
|
else: |
|
response_parts.append("π€ HF Expert Analysis\n*No response from HF Expert*") |
|
|
|
|
|
response_parts.append("\n" + "="*50 + "\n") |
|
|
|
|
|
if ollama_commentary: |
|
response_parts.append("π± Ollama Mentor Commentary\n" + ollama_commentary) |
|
else: |
|
response_parts.append("π± Ollama Mentor Commentary\n*I've reviewed the HF expert's response but couldn't provide additional insights.*") |
|
|
|
return "\n\n".join(response_parts) |
|
|
|
def _format_fallback_response(self, ollama_response: str, self_commentary: str) -> str: |
|
"""Format fallback response when HF fails""" |
|
response_parts = [] |
|
|
|
|
|
if ollama_response: |
|
response_parts.append("π¦ Ollama Primary Response (HF Expert Unavailable)\n" + ollama_response) |
|
else: |
|
response_parts.append("π¦ Ollama Primary Response (HF Expert Unavailable)\n*No response generated*") |
|
|
|
|
|
response_parts.append("\n" + "="*50 + "\n") |
|
|
|
|
|
if self_commentary: |
|
response_parts.append("π± Ollama Self-Commentary\n" + self_commentary) |
|
else: |
|
response_parts.append("π± Ollama Self-Commentary\n*I've reviewed my own response but couldn't provide additional insights.*") |
|
|
|
return "\n\n".join(response_parts) |
|
|
|
def validate_model(self) -> bool: |
|
"""Validate if coordinated system is available""" |
|
|
|
hf_available = self.hf_provider and self.hf_provider.validate_model() if self.hf_provider else False |
|
ollama_available = self.ollama_provider and self.ollama_provider.validate_model() if self.ollama_provider else False |
|
return hf_available or ollama_available |
|
|
|
|
|
coordinated_provider = CoordinatedProvider("coordinated_model") |
|
|