|
import requests |
|
import logging |
|
import re |
|
from typing import List, Dict, Optional, Union |
|
from core.providers.base import LLMProvider |
|
from utils.config import config |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class OllamaProvider(LLMProvider): |
|
"""Ollama LLM provider implementation with commentary support""" |
|
|
|
def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 3): |
|
super().__init__(model_name, timeout, max_retries) |
|
self.host = self._sanitize_host(config.ollama_host or "http://localhost:11434") |
|
self.headers = { |
|
"ngrok-skip-browser-warning": "true", |
|
"User-Agent": "CosmicCat-AI-Assistant" |
|
} |
|
|
|
def _sanitize_host(self, host: str) -> str: |
|
"""Sanitize host URL by removing whitespace and control characters""" |
|
if not host: |
|
return "http://localhost:11434" |
|
host = host.strip() |
|
host = re.sub(r'[\r\n\t\0]+', '', host) |
|
if not host.startswith(('http://', 'https://')): |
|
host = 'http://' + host |
|
return host |
|
|
|
def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Generate a response synchronously""" |
|
try: |
|
return self._retry_with_backoff(self._generate_impl, prompt, conversation_history) |
|
except Exception as e: |
|
logger.error(f"Ollama generation failed: {e}") |
|
return None |
|
|
|
def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]: |
|
"""Generate a response with streaming support""" |
|
try: |
|
return self._retry_with_backoff(self._stream_generate_impl, prompt, conversation_history) |
|
except Exception as e: |
|
logger.error(f"Ollama stream generation failed: {e}") |
|
return None |
|
|
|
def validate_model(self) -> bool: |
|
"""Validate if the model is available""" |
|
try: |
|
response = requests.get( |
|
f"{self.host}/api/tags", |
|
headers=self.headers, |
|
timeout=self.timeout |
|
) |
|
if response.status_code == 200: |
|
models = response.json().get("models", []) |
|
model_names = [model.get("name") for model in models] |
|
return self.model_name in model_names |
|
elif response.status_code == 404: |
|
response2 = requests.get( |
|
f"{self.host}", |
|
headers=self.headers, |
|
timeout=self.timeout |
|
) |
|
return response2.status_code == 200 |
|
return False |
|
except Exception as e: |
|
logger.warning(f"Model validation failed: {e}") |
|
return False |
|
|
|
def generate_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Generate commentary on HF response""" |
|
try: |
|
commentary_prompt = self._create_commentary_prompt(user_prompt, hf_response, conversation_history) |
|
return self._retry_with_backoff(self._generate_impl, commentary_prompt, []) |
|
except Exception as e: |
|
logger.error(f"Ollama commentary generation failed: {e}") |
|
return None |
|
|
|
def generate_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Generate self-commentary on own response""" |
|
try: |
|
commentary_prompt = self._create_self_commentary_prompt(user_prompt, ollama_response, conversation_history) |
|
return self._retry_with_backoff(self._generate_impl, commentary_prompt, []) |
|
except Exception as e: |
|
logger.error(f"Ollama self-commentary generation failed: {e}") |
|
return None |
|
|
|
def _create_commentary_prompt(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> str: |
|
"""Create prompt for Ollama to comment on HF response""" |
|
conversation_context = "\n".join([ |
|
f"{msg['role']}: {msg['content']}" |
|
for msg in conversation_history[-3:] |
|
]) |
|
|
|
prompt = f""" |
|
You are an AI mentor and conversation analyst. Your job is to analyze the interaction between a user and an expert AI, then provide insightful commentary. |
|
|
|
ANALYZE THIS INTERACTION: |
|
User Question: "{user_prompt}" |
|
Expert Response: "{hf_response}" |
|
|
|
Recent Conversation Context: {conversation_context} |
|
|
|
PROVIDE YOUR COMMENTARY IN THIS FORMAT: |
|
|
|
I've reviewed the HF expert's response and here's my insight: |
|
|
|
Key Points Observed: |
|
|
|
[Point 1] |
|
[Point 2] |
|
My Perspective: [Your commentary on the HF response] |
|
|
|
Suggestions: |
|
|
|
[Suggestion 1] |
|
[Suggestion 2] |
|
|
|
Keep your analysis concise but insightful. Focus on helping the user achieve their goals through better questioning and information gathering. |
|
""" |
|
return prompt |
|
|
|
def _create_self_commentary_prompt(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> str: |
|
"""Create prompt for Ollama to comment on its own response""" |
|
conversation_context = "\n".join([ |
|
f"{msg['role']}: {msg['content']}" |
|
for msg in conversation_history[-3:] |
|
]) |
|
|
|
prompt = f""" |
|
You are an AI mentor and conversation analyst. Your job is to analyze your own response to a user question, then provide insightful self-reflection. |
|
|
|
ANALYZE YOUR RESPONSE: |
|
User Question: "{user_prompt}" |
|
Your Response: "{ollama_response}" |
|
|
|
Recent Conversation Context: {conversation_context} |
|
|
|
PROVIDE YOUR SELF-COMMENTARY IN THIS FORMAT: |
|
|
|
I've reviewed my own response and here's my self-reflection: |
|
|
|
Key Points Addressed: |
|
|
|
[Point 1] |
|
[Point 2] |
|
My Self-Assessment: [Your reflection on your own response quality] |
|
|
|
Areas for Improvement: |
|
|
|
[Area 1] |
|
[Area 2] |
|
|
|
Keep your analysis honest and constructive. Focus on how you could have provided better assistance. |
|
""" |
|
return prompt |
|
|
|
def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str: |
|
"""Implementation of synchronous generation""" |
|
try: |
|
url = f"{self.host}/api/chat" |
|
messages = conversation_history.copy() |
|
messages.append({"role": "user", "content": prompt}) |
|
|
|
payload = { |
|
"model": self.model_name, |
|
"messages": messages, |
|
"stream": False |
|
} |
|
|
|
logger.info(f"Ollama request URL: {url}") |
|
logger.info(f"Ollama request payload: {payload}") |
|
logger.info(f"Ollama headers: {self.headers}") |
|
|
|
response = requests.post( |
|
url, |
|
json=payload, |
|
headers=self.headers, |
|
timeout=self.timeout |
|
) |
|
|
|
logger.info(f"Ollama response status: {response.status_code}") |
|
logger.info(f"Ollama response headers: {dict(response.headers)}") |
|
|
|
response.raise_for_status() |
|
result = response.json() |
|
logger.info(f"Ollama response body: {result}") |
|
|
|
content = None |
|
if "message" in result and "content" in result["message"]: |
|
content = result["message"]["content"] |
|
elif "response" in result: |
|
content = result["response"] |
|
else: |
|
content = str(result) |
|
|
|
logger.info(f"Extracted content length: {len(content) if content else 0}") |
|
return content if content else "" |
|
|
|
except Exception as e: |
|
logger.error(f"Ollama API request error: {str(e)}") |
|
raise Exception(f"Ollama API error: {str(e)}") |
|
|
|
def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]: |
|
"""Implementation of streaming generation""" |
|
try: |
|
url = f"{self.host}/api/chat" |
|
messages = conversation_history.copy() |
|
messages.append({"role": "user", "content": prompt}) |
|
|
|
payload = { |
|
"model": self.model_name, |
|
"messages": messages, |
|
"stream": True |
|
} |
|
|
|
response = requests.post( |
|
url, |
|
json=payload, |
|
headers=self.headers, |
|
timeout=self.timeout, |
|
stream=True |
|
) |
|
response.raise_for_status() |
|
|
|
chunks = [] |
|
for line in response.iter_lines(): |
|
if line: |
|
chunk = line.decode('utf-8') |
|
try: |
|
data = eval(chunk) |
|
content = data.get("message", {}).get("content", "") |
|
if content: |
|
chunks.append(content) |
|
except: |
|
continue |
|
return chunks |
|
except Exception as e: |
|
logger.error(f"Ollama stream generation failed: {e}") |
|
raise |
|
|
|
|
|
ollama_provider = OllamaProvider(config.local_model_name) |
|
|