import requests import logging import re from typing import List, Dict, Optional, Union from core.providers.base import LLMProvider from utils.config import config logger = logging.getLogger(__name__) class OllamaProvider(LLMProvider): """Ollama LLM provider implementation with commentary support""" def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 3): # Increased timeout to 120s super().__init__(model_name, timeout, max_retries) self.host = self._sanitize_host(config.ollama_host or "http://localhost:11434") self.headers = { "ngrok-skip-browser-warning": "true", "User-Agent": "CosmicCat-AI-Assistant" } def _sanitize_host(self, host: str) -> str: """Sanitize host URL by removing whitespace and control characters""" if not host: return "http://localhost:11434" host = host.strip() host = re.sub(r'[\r\n\t\0]+', '', host) if not host.startswith(('http://', 'https://')): host = 'http://' + host return host def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: """Generate a response synchronously""" try: return self._retry_with_backoff(self._generate_impl, prompt, conversation_history) except Exception as e: logger.error(f"Ollama generation failed: {e}") return None def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]: """Generate a response with streaming support""" try: return self._retry_with_backoff(self._stream_generate_impl, prompt, conversation_history) except Exception as e: logger.error(f"Ollama stream generation failed: {e}") return None def validate_model(self) -> bool: """Validate if the model is available""" try: response = requests.get( f"{self.host}/api/tags", headers=self.headers, timeout=self.timeout ) if response.status_code == 200: models = response.json().get("models", []) model_names = [model.get("name") for model in models] return self.model_name in model_names elif response.status_code == 404: response2 = requests.get( f"{self.host}", headers=self.headers, timeout=self.timeout ) return response2.status_code == 200 return False except Exception as e: logger.warning(f"Model validation failed: {e}") return False def generate_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]: """Generate commentary on HF response""" try: commentary_prompt = self._create_commentary_prompt(user_prompt, hf_response, conversation_history) return self._retry_with_backoff(self._generate_impl, commentary_prompt, []) except Exception as e: logger.error(f"Ollama commentary generation failed: {e}") return None def generate_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]: """Generate self-commentary on own response""" try: commentary_prompt = self._create_self_commentary_prompt(user_prompt, ollama_response, conversation_history) return self._retry_with_backoff(self._generate_impl, commentary_prompt, []) except Exception as e: logger.error(f"Ollama self-commentary generation failed: {e}") return None def _create_commentary_prompt(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> str: """Create prompt for Ollama to comment on HF response""" conversation_context = "\n".join([ f"{msg['role']}: {msg['content']}" for msg in conversation_history[-3:] # Last 3 messages for context ]) prompt = f""" You are an AI mentor and conversation analyst. Your job is to analyze the interaction between a user and an expert AI, then provide insightful commentary. ANALYZE THIS INTERACTION: User Question: "{user_prompt}" Expert Response: "{hf_response}" Recent Conversation Context: {conversation_context} PROVIDE YOUR COMMENTARY IN THIS FORMAT: I've reviewed the HF expert's response and here's my insight: Key Points Observed: [Point 1] [Point 2] My Perspective: [Your commentary on the HF response] Suggestions: [Suggestion 1] [Suggestion 2] Keep your analysis concise but insightful. Focus on helping the user achieve their goals through better questioning and information gathering. """ return prompt def _create_self_commentary_prompt(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> str: """Create prompt for Ollama to comment on its own response""" conversation_context = "\n".join([ f"{msg['role']}: {msg['content']}" for msg in conversation_history[-3:] # Last 3 messages for context ]) prompt = f""" You are an AI mentor and conversation analyst. Your job is to analyze your own response to a user question, then provide insightful self-reflection. ANALYZE YOUR RESPONSE: User Question: "{user_prompt}" Your Response: "{ollama_response}" Recent Conversation Context: {conversation_context} PROVIDE YOUR SELF-COMMENTARY IN THIS FORMAT: I've reviewed my own response and here's my self-reflection: Key Points Addressed: [Point 1] [Point 2] My Self-Assessment: [Your reflection on your own response quality] Areas for Improvement: [Area 1] [Area 2] Keep your analysis honest and constructive. Focus on how you could have provided better assistance. """ return prompt def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str: """Implementation of synchronous generation""" try: url = f"{self.host}/api/chat" messages = conversation_history.copy() messages.append({"role": "user", "content": prompt}) payload = { "model": self.model_name, "messages": messages, "stream": False } logger.info(f"Ollama request URL: {url}") logger.info(f"Ollama request payload: {payload}") logger.info(f"Ollama headers: {self.headers}") response = requests.post( url, json=payload, headers=self.headers, timeout=self.timeout ) logger.info(f"Ollama response status: {response.status_code}") logger.info(f"Ollama response headers: {dict(response.headers)}") response.raise_for_status() result = response.json() logger.info(f"Ollama response body: {result}") content = None if "message" in result and "content" in result["message"]: content = result["message"]["content"] elif "response" in result: content = result["response"] else: content = str(result) logger.info(f"Extracted content length: {len(content) if content else 0}") return content if content else "" except Exception as e: logger.error(f"Ollama API request error: {str(e)}") raise Exception(f"Ollama API error: {str(e)}") def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]: """Implementation of streaming generation""" try: url = f"{self.host}/api/chat" messages = conversation_history.copy() messages.append({"role": "user", "content": prompt}) payload = { "model": self.model_name, "messages": messages, "stream": True } response = requests.post( url, json=payload, headers=self.headers, timeout=self.timeout, stream=True ) response.raise_for_status() chunks = [] for line in response.iter_lines(): if line: chunk = line.decode('utf-8') try: data = eval(chunk) # Simplified JSON parsing content = data.get("message", {}).get("content", "") if content: chunks.append(content) except: continue return chunks except Exception as e: logger.error(f"Ollama stream generation failed: {e}") raise # Global instance ollama_provider = OllamaProvider(config.local_model_name)