import time import logging from typing import List, Dict, Optional, Union from src.llm.base_provider import LLMProvider from src.llm.hf_provider import HuggingFaceProvider from src.llm.ollama_provider import OllamaProvider from utils.config import config logger = logging.getLogger(__name__) class CoordinatedProvider(LLMProvider): """Coordinated provider that orchestrates Ollama and HF interaction""" def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 2): super().__init__(model_name, timeout, max_retries) self.hf_provider = None self.ollama_provider = None # Initialize providers try: if config.hf_token: self.hf_provider = HuggingFaceProvider( model_name="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf", timeout=120 ) except Exception as e: logger.warning(f"Failed to initialize HF provider: {e}") try: if config.ollama_host: self.ollama_provider = OllamaProvider( model_name=config.local_model_name, timeout=60 ) except Exception as e: logger.warning(f"Failed to initialize Ollama provider: {e}") def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: """Generate coordinated response using HF+Ollama""" try: return self._retry_with_backoff(self._generate_coordinated_response, prompt, conversation_history) except Exception as e: logger.error(f"Coordinated generation failed: {e}") raise def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]: """Generate coordinated response with streaming support""" try: return self._retry_with_backoff(self._stream_generate_coordinated_response, prompt, conversation_history) except Exception as e: logger.error(f"Coordinated stream generation failed: {e}") raise def _generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> str: """Main method that orchestrates both providers""" try: # Step 1: Get response from HF Endpoint (primary expert) hf_response = self._get_hf_response(prompt, conversation_history) # Step 2: Get Ollama commentary on HF response ollama_commentary = self._get_ollama_commentary(prompt, hf_response, conversation_history) # Step 3: Combine responses with clear formatting coordinated_response = self._format_coordinated_response(hf_response, ollama_commentary) return coordinated_response except Exception as e: logger.error(f"Coordinated response generation failed: {e}") # Fallback to Ollama only if self.ollama_provider: try: ollama_response = self.ollama_provider.generate(prompt, conversation_history) # Generate self-commentary self_commentary = self._get_ollama_self_commentary(prompt, ollama_response, conversation_history) return self._format_fallback_response(ollama_response, self_commentary) except Exception as fallback_error: logger.error(f"Ollama fallback also failed: {fallback_error}") raise Exception(f"Both HF Endpoint and Ollama failed: {str(e)}") def _stream_generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> List[str]: """Implementation of streaming coordinated generation""" # For simplicity, we'll return the full response as chunks full_response = self._generate_coordinated_response(prompt, conversation_history) return [full_response] def _get_hf_response(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: """Get response from HF Endpoint (primary expert)""" if not self.hf_provider: return None try: logger.info("🚀 Getting detailed response from HF Endpoint (primary expert)...") response = self.hf_provider.generate(prompt, conversation_history) logger.info("✅ HF Endpoint expert response received") return response except Exception as e: logger.error(f"HF Endpoint expert failed: {e}") return None def _get_ollama_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]: """Get Ollama commentary on HF response""" if not self.ollama_provider: return None try: logger.info("🐱 Getting Ollama commentary on HF response...") commentary = self.ollama_provider.generate_commentary(user_prompt, hf_response, conversation_history) logger.info("✅ Ollama commentary received") return commentary except Exception as e: logger.warning(f"Ollama commentary failed: {e}") return None def _get_ollama_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]: """Get Ollama self-commentary when HF fails""" if not self.ollama_provider: return None try: logger.info("🐱 Getting Ollama self-commentary on own response...") commentary = self.ollama_provider.generate_self_commentary(user_prompt, ollama_response, conversation_history) logger.info("✅ Ollama self-commentary received") return commentary except Exception as e: logger.warning(f"Ollama self-commentary failed: {e}") return None def _format_coordinated_response(self, hf_response: str, ollama_commentary: str) -> str: """Format combined response with clear separation""" response_parts = [] # Add HF Expert response if hf_response: response_parts.append("🤖 HF Expert Analysis\n" + hf_response) else: response_parts.append("🤖 HF Expert Analysis\n*No response from HF Expert*") # Add separator response_parts.append("\n" + "="*50 + "\n") # Add Ollama Mentor commentary if ollama_commentary: response_parts.append("🐱 Ollama Mentor Commentary\n" + ollama_commentary) else: response_parts.append("🐱 Ollama Mentor Commentary\n*I've reviewed the HF expert's response but couldn't provide additional insights.*") return "\n\n".join(response_parts) def _format_fallback_response(self, ollama_response: str, self_commentary: str) -> str: """Format fallback response when HF fails""" response_parts = [] # Add Ollama main response with fallback indication if ollama_response: response_parts.append("🦙 Ollama Primary Response (HF Expert Unavailable)\n" + ollama_response) else: response_parts.append("🦙 Ollama Primary Response (HF Expert Unavailable)\n*No response generated*") # Add separator response_parts.append("\n" + "="*50 + "\n") # Add Ollama self-commentary if self_commentary: response_parts.append("🐱 Ollama Self-Commentary\n" + self_commentary) else: response_parts.append("🐱 Ollama Self-Commentary\n*I've reviewed my own response but couldn't provide additional insights.*") return "\n\n".join(response_parts) def validate_model(self) -> bool: """Validate if coordinated system is available""" # At least one provider must be available hf_available = self.hf_provider and self.hf_provider.validate_model() if self.hf_provider else False ollama_available = self.ollama_provider and self.ollama_provider.validate_model() if self.ollama_provider else False return hf_available or ollama_available # Global instance coordinated_provider = CoordinatedProvider("coordinated_model")