rdune71's picture
Fix critical issues: session state error, Ollama timeouts, redundant logging
dc6e56f
import requests
import logging
import re
from typing import List, Dict, Optional, Union
from core.providers.base import LLMProvider
from utils.config import config
logger = logging.getLogger(__name__)
class OllamaProvider(LLMProvider):
"""Ollama LLM provider implementation with commentary support"""
def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 3): # Increased timeout to 120s
super().__init__(model_name, timeout, max_retries)
self.host = self._sanitize_host(config.ollama_host or "http://localhost:11434")
self.headers = {
"ngrok-skip-browser-warning": "true",
"User-Agent": "CosmicCat-AI-Assistant"
}
def _sanitize_host(self, host: str) -> str:
"""Sanitize host URL by removing whitespace and control characters"""
if not host:
return "http://localhost:11434"
host = host.strip()
host = re.sub(r'[\r\n\t\0]+', '', host)
if not host.startswith(('http://', 'https://')):
host = 'http://' + host
return host
def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
"""Generate a response synchronously"""
try:
return self._retry_with_backoff(self._generate_impl, prompt, conversation_history)
except Exception as e:
logger.error(f"Ollama generation failed: {e}")
return None
def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]:
"""Generate a response with streaming support"""
try:
return self._retry_with_backoff(self._stream_generate_impl, prompt, conversation_history)
except Exception as e:
logger.error(f"Ollama stream generation failed: {e}")
return None
def validate_model(self) -> bool:
"""Validate if the model is available"""
try:
response = requests.get(
f"{self.host}/api/tags",
headers=self.headers,
timeout=self.timeout
)
if response.status_code == 200:
models = response.json().get("models", [])
model_names = [model.get("name") for model in models]
return self.model_name in model_names
elif response.status_code == 404:
response2 = requests.get(
f"{self.host}",
headers=self.headers,
timeout=self.timeout
)
return response2.status_code == 200
return False
except Exception as e:
logger.warning(f"Model validation failed: {e}")
return False
def generate_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]:
"""Generate commentary on HF response"""
try:
commentary_prompt = self._create_commentary_prompt(user_prompt, hf_response, conversation_history)
return self._retry_with_backoff(self._generate_impl, commentary_prompt, [])
except Exception as e:
logger.error(f"Ollama commentary generation failed: {e}")
return None
def generate_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]:
"""Generate self-commentary on own response"""
try:
commentary_prompt = self._create_self_commentary_prompt(user_prompt, ollama_response, conversation_history)
return self._retry_with_backoff(self._generate_impl, commentary_prompt, [])
except Exception as e:
logger.error(f"Ollama self-commentary generation failed: {e}")
return None
def _create_commentary_prompt(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> str:
"""Create prompt for Ollama to comment on HF response"""
conversation_context = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in conversation_history[-3:] # Last 3 messages for context
])
prompt = f"""
You are an AI mentor and conversation analyst. Your job is to analyze the interaction between a user and an expert AI, then provide insightful commentary.
ANALYZE THIS INTERACTION:
User Question: "{user_prompt}"
Expert Response: "{hf_response}"
Recent Conversation Context: {conversation_context}
PROVIDE YOUR COMMENTARY IN THIS FORMAT:
I've reviewed the HF expert's response and here's my insight:
Key Points Observed:
[Point 1]
[Point 2]
My Perspective: [Your commentary on the HF response]
Suggestions:
[Suggestion 1]
[Suggestion 2]
Keep your analysis concise but insightful. Focus on helping the user achieve their goals through better questioning and information gathering.
"""
return prompt
def _create_self_commentary_prompt(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> str:
"""Create prompt for Ollama to comment on its own response"""
conversation_context = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in conversation_history[-3:] # Last 3 messages for context
])
prompt = f"""
You are an AI mentor and conversation analyst. Your job is to analyze your own response to a user question, then provide insightful self-reflection.
ANALYZE YOUR RESPONSE:
User Question: "{user_prompt}"
Your Response: "{ollama_response}"
Recent Conversation Context: {conversation_context}
PROVIDE YOUR SELF-COMMENTARY IN THIS FORMAT:
I've reviewed my own response and here's my self-reflection:
Key Points Addressed:
[Point 1]
[Point 2]
My Self-Assessment: [Your reflection on your own response quality]
Areas for Improvement:
[Area 1]
[Area 2]
Keep your analysis honest and constructive. Focus on how you could have provided better assistance.
"""
return prompt
def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str:
"""Implementation of synchronous generation"""
try:
url = f"{self.host}/api/chat"
messages = conversation_history.copy()
messages.append({"role": "user", "content": prompt})
payload = {
"model": self.model_name,
"messages": messages,
"stream": False
}
logger.info(f"Ollama request URL: {url}")
logger.info(f"Ollama request payload: {payload}")
logger.info(f"Ollama headers: {self.headers}")
response = requests.post(
url,
json=payload,
headers=self.headers,
timeout=self.timeout
)
logger.info(f"Ollama response status: {response.status_code}")
logger.info(f"Ollama response headers: {dict(response.headers)}")
response.raise_for_status()
result = response.json()
logger.info(f"Ollama response body: {result}")
content = None
if "message" in result and "content" in result["message"]:
content = result["message"]["content"]
elif "response" in result:
content = result["response"]
else:
content = str(result)
logger.info(f"Extracted content length: {len(content) if content else 0}")
return content if content else ""
except Exception as e:
logger.error(f"Ollama API request error: {str(e)}")
raise Exception(f"Ollama API error: {str(e)}")
def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]:
"""Implementation of streaming generation"""
try:
url = f"{self.host}/api/chat"
messages = conversation_history.copy()
messages.append({"role": "user", "content": prompt})
payload = {
"model": self.model_name,
"messages": messages,
"stream": True
}
response = requests.post(
url,
json=payload,
headers=self.headers,
timeout=self.timeout,
stream=True
)
response.raise_for_status()
chunks = []
for line in response.iter_lines():
if line:
chunk = line.decode('utf-8')
try:
data = eval(chunk) # Simplified JSON parsing
content = data.get("message", {}).get("content", "")
if content:
chunks.append(content)
except:
continue
return chunks
except Exception as e:
logger.error(f"Ollama stream generation failed: {e}")
raise
# Global instance
ollama_provider = OllamaProvider(config.local_model_name)