|
import time |
|
import logging |
|
from typing import List, Dict, Optional, Union, Tuple |
|
from src.llm.base_provider import LLMProvider |
|
from src.llm.hf_provider import HuggingFaceProvider |
|
from src.llm.ollama_provider import OllamaProvider |
|
from core.session import session_manager |
|
from utils.config import config |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class MentorProvider(LLMProvider): |
|
"""Mentor provider that uses HF as expert and Ollama as mentor/coach""" |
|
|
|
def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 2): |
|
super().__init__(model_name, timeout, max_retries) |
|
self.hf_provider = None |
|
self.ollama_provider = None |
|
self.conversation_analyzer = ConversationAnalyzer() |
|
|
|
|
|
try: |
|
if config.hf_token: |
|
self.hf_provider = HuggingFaceProvider( |
|
model_name="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf", |
|
timeout=120 |
|
) |
|
except Exception as e: |
|
logger.warning(f"Failed to initialize HF provider: {e}") |
|
|
|
try: |
|
if config.ollama_host: |
|
self.ollama_provider = OllamaProvider( |
|
model_name=config.local_model_name, |
|
timeout=60 |
|
) |
|
except Exception as e: |
|
logger.warning(f"Failed to initialize Ollama provider: {e}") |
|
|
|
def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Generate response using mentor approach""" |
|
try: |
|
|
|
hf_response = self._get_expert_response(prompt, conversation_history) |
|
|
|
if not hf_response: |
|
raise Exception("HF Endpoint expert failed to provide response") |
|
|
|
|
|
mentor_insights = self._get_mentor_analysis( |
|
prompt, |
|
hf_response, |
|
conversation_history |
|
) |
|
|
|
|
|
combined_response = self._combine_responses(hf_response, mentor_insights) |
|
|
|
|
|
self._store_interaction(prompt, hf_response, mentor_insights) |
|
|
|
return combined_response |
|
|
|
except Exception as e: |
|
logger.error(f"Mentor generation failed: {e}") |
|
|
|
|
|
if self.hf_provider: |
|
try: |
|
logger.info("Falling back to HF Endpoint only") |
|
return self.hf_provider.generate(prompt, conversation_history) |
|
except Exception as fallback_error: |
|
logger.error(f"HF fallback also failed: {fallback_error}") |
|
|
|
raise Exception(f"All providers failed: {str(e)}") |
|
|
|
def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]: |
|
"""Stream response using mentor approach""" |
|
try: |
|
|
|
if self.hf_provider: |
|
hf_stream = self.hf_provider.stream_generate(prompt, conversation_history) |
|
return hf_stream |
|
else: |
|
raise Exception("No HF provider available for streaming") |
|
except Exception as e: |
|
logger.error(f"Mentor stream generation failed: {e}") |
|
raise |
|
|
|
def _get_expert_response(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: |
|
"""Get expert response from HF Endpoint""" |
|
if not self.hf_provider: |
|
return None |
|
|
|
try: |
|
logger.info("🤖 Getting expert response from HF Endpoint...") |
|
response = self.hf_provider.generate(prompt, conversation_history) |
|
logger.info("✅ HF Endpoint expert response received") |
|
return response |
|
except Exception as e: |
|
logger.error(f"HF Endpoint expert failed: {e}") |
|
return None |
|
|
|
def _get_mentor_analysis(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Dict: |
|
"""Get mentor analysis and suggestions from Ollama""" |
|
if not self.ollama_provider: |
|
return {} |
|
|
|
try: |
|
logger.info("🐱 Getting mentor analysis from Ollama...") |
|
|
|
|
|
mentor_prompt = self._create_mentor_prompt(user_prompt, hf_response, conversation_history) |
|
|
|
|
|
mentor_response = self.ollama_provider.generate(mentor_prompt, []) |
|
|
|
|
|
insights = self.conversation_analyzer.parse_mentor_response(mentor_response) |
|
|
|
logger.info("✅ Ollama mentor analysis completed") |
|
return insights |
|
|
|
except Exception as e: |
|
logger.warning(f"Ollama mentor analysis failed: {e}") |
|
return {} |
|
|
|
def _create_mentor_prompt(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> str: |
|
"""Create prompt for Ollama mentor to analyze interaction""" |
|
conversation_context = "\n".join([ |
|
f"{msg['role']}: {msg['content']}" |
|
for msg in conversation_history[-5:] |
|
]) |
|
|
|
prompt = f""" |
|
You are an AI mentor and conversation analyst. Your job is to analyze the interaction between a user and an expert AI, then provide insightful guidance. |
|
|
|
ANALYZE THIS INTERACTION: |
|
User Question: "{user_prompt}" |
|
Expert Response: "{hf_response}" |
|
|
|
Recent Conversation Context: |
|
{conversation_context} |
|
|
|
PROVIDE YOUR ANALYSIS IN THIS FORMAT: |
|
|
|
<thinking_analysis> |
|
Analyze the expert's reasoning approach, depth of analysis, and problem-solving methodology. |
|
</thinking_analysis> |
|
|
|
<goal_progress> |
|
Assess how well this response advances toward the user's likely goals based on conversation history. |
|
</goal_progress> |
|
|
|
<follow_up_suggestions> |
|
Provide 2-3 thoughtful follow-up questions or research directions that would deepen understanding. |
|
</follow_up_suggestions> |
|
|
|
<data_gathering> |
|
Suggest what additional information or data would be valuable to collect. |
|
</data_gathering> |
|
|
|
<critical_insights> |
|
Highlight any key insights, potential blind spots, or areas needing further exploration. |
|
</critical_insights> |
|
|
|
Keep your analysis concise but insightful. Focus on helping the user achieve their goals through better questioning and information gathering. |
|
""" |
|
return prompt |
|
|
|
def _combine_responses(self, hf_response: str, mentor_insights: Dict) -> str: |
|
"""Combine expert response with mentor insights""" |
|
if not mentor_insights: |
|
return hf_response |
|
|
|
|
|
insights_section = "\n\n--- 🎓 Mentor Insights ---\n" |
|
|
|
if mentor_insights.get('thinking_analysis'): |
|
insights_section += f"\n🧠 **Thinking Analysis**\n{mentor_insights['thinking_analysis']}" |
|
|
|
if mentor_insights.get('goal_progress'): |
|
insights_section += f"\n\n🎯 **Goal Progress**\n{mentor_insights['goal_progress']}" |
|
|
|
if mentor_insights.get('follow_up_suggestions'): |
|
insights_section += f"\n\n🤔 **Follow-up Suggestions**\n{mentor_insights['follow_up_suggestions']}" |
|
|
|
if mentor_insights.get('data_gathering'): |
|
insights_section += f"\n\n📋 **Data to Gather**\n{mentor_insights['data_gathering']}" |
|
|
|
if mentor_insights.get('critical_insights'): |
|
insights_section += f"\n\n💡 **Critical Insights**\n{mentor_insights['critical_insights']}" |
|
|
|
return f"{hf_response}{insights_section}" |
|
|
|
def _store_interaction(self, user_prompt: str, hf_response: str, mentor_insights: Dict): |
|
"""Store interaction for learning and pattern recognition""" |
|
try: |
|
user_session = session_manager.get_session("default_user") |
|
interaction_log = user_session.get("interaction_log", []) |
|
|
|
|
|
interaction = { |
|
"timestamp": time.time(), |
|
"user_prompt": user_prompt, |
|
"expert_response": hf_response, |
|
"mentor_insights": mentor_insights, |
|
"conversation_length": len(interaction_log) |
|
} |
|
|
|
|
|
interaction_log.append(interaction) |
|
if len(interaction_log) > 20: |
|
interaction_log = interaction_log[-20:] |
|
|
|
user_session["interaction_log"] = interaction_log |
|
session_manager.update_session("default_user", user_session) |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to store interaction: {e}") |
|
|
|
class ConversationAnalyzer: |
|
"""Analyzes conversation patterns and provides insights""" |
|
|
|
def parse_mentor_response(self, mentor_response: str) -> Dict: |
|
"""Parse mentor response into structured insights""" |
|
if not mentor_response: |
|
return {} |
|
|
|
insights = {} |
|
|
|
|
|
sections = { |
|
'thinking_analysis': self._extract_section(mentor_response, 'thinking_analysis'), |
|
'goal_progress': self._extract_section(mentor_response, 'goal_progress'), |
|
'follow_up_suggestions': self._extract_section(mentor_response, 'follow_up_suggestions'), |
|
'data_gathering': self._extract_section(mentor_response, 'data_gathering'), |
|
'critical_insights': self._extract_section(mentor_response, 'critical_insights') |
|
} |
|
|
|
|
|
for key, value in sections.items(): |
|
if value: |
|
|
|
cleaned = value.strip() |
|
if cleaned: |
|
insights[key] = cleaned |
|
|
|
return insights |
|
|
|
def _extract_section(self, text: str, section_name: str) -> Optional[str]: |
|
"""Extract specific section from mentor response""" |
|
start_tag = f"<{section_name}>" |
|
end_tag = f"</{section_name}>" |
|
|
|
start_idx = text.find(start_tag) |
|
if start_idx == -1: |
|
return None |
|
|
|
start_idx += len(start_tag) |
|
end_idx = text.find(end_tag, start_idx) |
|
|
|
if end_idx == -1: |
|
return None |
|
|
|
return text[start_idx:end_idx].strip() |
|
|
|
|
|
mentor_provider = MentorProvider("mentor_model") |
|
|