|
import asyncio |
|
import logging |
|
from typing import List, Dict, Optional, AsyncGenerator |
|
from core.llm_factory import llm_factory |
|
from core.session import session_manager |
|
from services.hf_endpoint_monitor import hf_monitor |
|
from services.weather import weather_service |
|
try: |
|
from tavily import TavilyClient |
|
TAVILY_AVAILABLE = True |
|
except ImportError: |
|
TavilyClient = None |
|
TAVILY_AVAILABLE = False |
|
import os |
|
import json |
|
from datetime import datetime |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class AICoordinator: |
|
"""Hierarchical multi-model coordinator with HF endpoint as authoritative layer""" |
|
|
|
def __init__(self): |
|
self.tavily_client = None |
|
if TAVILY_AVAILABLE and os.getenv("TAVILY_API_KEY"): |
|
self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) |
|
|
|
|
|
self.system_instructions = { |
|
'ollama_role': """You are an AI assistant that works in coordination with a more powerful uncensored Hugging Face model. |
|
|
|
Your role is to: |
|
1. Provide immediate, concise responses using available information |
|
2. Summarize and simplify complex information |
|
3. Enhance responses with local knowledge |
|
4. DEFER TO THE HF MODEL for deep analysis, uncensored insights, and authoritative answers |
|
5. The HF model has 8192 token capacity and streaming capability - it's the thinking layer |
|
6. Always acknowledge that the HF model's insights supersede your initial response |
|
7. Use web sources to justify points when possible |
|
8. Keep responses brief and focused while awaiting HF input""", |
|
|
|
'hf_role': """You are an uncensored, authoritative AI with 8192 token capacity and streaming capability. |
|
Your role is to: |
|
1. Provide deep, uncensored analysis and insights |
|
2. Supersede the local Ollama model's logic when there's disagreement |
|
3. Think thoroughly and provide comprehensive responses |
|
4. Use your full 8192 token capacity for detailed analysis |
|
5. Stream your responses for real-time delivery |
|
6. Build upon the conversation context and Ollama's initial response |
|
7. Provide authoritative answers that take precedence""" |
|
} |
|
|
|
def determine_web_search_needs(self, conversation_history: List[Dict]) -> Dict: |
|
"""Determine if web search is needed based on conversation content""" |
|
conversation_text = " ".join([msg.get("content", "") for msg in conversation_history]) |
|
|
|
|
|
current_info_indicators = [ |
|
"news", "current events", "latest", "recent", "today", |
|
"weather", "temperature", "forecast", |
|
"stock", "price", "trend", "market", |
|
"breaking", "update", "development" |
|
] |
|
|
|
needs_search = False |
|
search_topics = [] |
|
|
|
for indicator in current_info_indicators: |
|
if indicator in conversation_text.lower(): |
|
needs_search = True |
|
search_topics.append(indicator) |
|
|
|
return { |
|
"needs_search": needs_search, |
|
"search_topics": search_topics, |
|
"reasoning": f"Found topics requiring current info: {', '.join(search_topics)}" if search_topics else "No current info needed" |
|
} |
|
|
|
def manual_hf_analysis(self, user_id: str, conversation_history: List[Dict]) -> str: |
|
"""Perform manual HF analysis with web search integration""" |
|
try: |
|
|
|
research_decision = self.determine_web_search_needs(conversation_history) |
|
|
|
|
|
system_prompt = f""" |
|
You are a deep analysis expert joining an ongoing conversation. |
|
|
|
Research Decision: {research_decision['reasoning']} |
|
|
|
Please provide: |
|
1. Deep insights on conversation themes |
|
2. Research/web search needs (if any) |
|
3. Strategic recommendations |
|
4. Questions to explore further |
|
|
|
Conversation History: |
|
""" |
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] |
|
|
|
|
|
for msg in conversation_history[-15:]: |
|
messages.append({ |
|
"role": msg["role"], |
|
"content": msg["content"] |
|
}) |
|
|
|
|
|
from core.llm_factory import llm_factory |
|
hf_provider = llm_factory.get_provider('huggingface') |
|
|
|
if hf_provider: |
|
|
|
response = hf_provider.generate("Deep analysis request", messages) |
|
return response or "HF Expert analysis completed." |
|
else: |
|
return "❌ HF provider not available." |
|
|
|
except Exception as e: |
|
return f"❌ HF analysis failed: {str(e)}" |
|
|
|
|
|
def get_hf_engagement_status(self) -> Dict: |
|
"""Get current HF engagement status""" |
|
return { |
|
"hf_available": self._check_hf_availability(), |
|
"web_search_configured": bool(self.tavily_client), |
|
"research_needs_detected": False, |
|
"last_hf_analysis": None |
|
} |
|
|
|
async def coordinate_hierarchical_conversation(self, user_id: str, user_query: str) -> AsyncGenerator[Dict, None]: |
|
""" |
|
Enhanced coordination with detailed tracking and feedback |
|
""" |
|
try: |
|
|
|
session = session_manager.get_session(user_id) |
|
conversation_history = session.get("conversation", []).copy() |
|
|
|
yield { |
|
'type': 'coordination_status', |
|
'content': '🚀 Initiating hierarchical AI coordination...', |
|
'details': { |
|
'conversation_length': len(conversation_history), |
|
'user_query_length': len(user_query) |
|
} |
|
} |
|
|
|
|
|
yield { |
|
'type': 'coordination_status', |
|
'content': '🔍 Gathering external context...', |
|
'details': {'phase': 'external_data_gathering'} |
|
} |
|
external_data = await self._gather_external_data(user_query) |
|
|
|
|
|
if external_data: |
|
data_summary = [] |
|
if 'search_results' in external_data: |
|
data_summary.append(f"Web search: {len(external_data['search_results'])} results") |
|
if 'weather' in external_data: |
|
data_summary.append("Weather data: available") |
|
if 'current_datetime' in external_data: |
|
data_summary.append(f"Time: {external_data['current_datetime']}") |
|
|
|
yield { |
|
'type': 'coordination_status', |
|
'content': f'📊 External data gathered: {", ".join(data_summary)}', |
|
'details': {'external_data_summary': data_summary} |
|
} |
|
|
|
|
|
yield { |
|
'type': 'coordination_status', |
|
'content': '🦙 Getting initial response from Ollama...', |
|
'details': {'phase': 'ollama_response'} |
|
} |
|
ollama_response = await self._get_hierarchical_ollama_response( |
|
user_query, conversation_history, external_data |
|
) |
|
|
|
|
|
yield { |
|
'type': 'initial_response', |
|
'content': ollama_response, |
|
'details': { |
|
'response_length': len(ollama_response), |
|
'external_data_injected': bool(external_data) |
|
} |
|
} |
|
|
|
|
|
yield { |
|
'type': 'coordination_status', |
|
'content': '🤗 Engaging HF endpoint for deep analysis...', |
|
'details': {'phase': 'hf_coordination'} |
|
} |
|
|
|
|
|
hf_available = self._check_hf_availability() |
|
if hf_available: |
|
|
|
context_summary = { |
|
'conversation_turns': len(conversation_history), |
|
'ollama_response_length': len(ollama_response), |
|
'external_data_items': len(external_data) if external_data else 0 |
|
} |
|
|
|
yield { |
|
'type': 'coordination_status', |
|
'content': f'📋 HF context: {len(conversation_history)} conversation turns, Ollama response ({len(ollama_response)} chars)', |
|
'details': context_summary |
|
} |
|
|
|
|
|
async for hf_chunk in self._coordinate_hierarchical_hf_response( |
|
user_id, user_query, conversation_history, |
|
external_data, ollama_response |
|
): |
|
yield hf_chunk |
|
else: |
|
yield { |
|
'type': 'coordination_status', |
|
'content': 'ℹ️ HF endpoint not available - using Ollama response', |
|
'details': {'hf_available': False} |
|
} |
|
|
|
|
|
yield { |
|
'type': 'coordination_status', |
|
'content': '✅ Hierarchical coordination complete', |
|
'details': {'status': 'complete'} |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Hierarchical coordination failed: {e}") |
|
yield { |
|
'type': 'coordination_status', |
|
'content': f'❌ Coordination error: {str(e)}', |
|
'details': {'error': str(e)} |
|
} |
|
|
|
async def _coordinate_hierarchical_hf_response(self, user_id: str, query: str, |
|
history: List, external_data: Dict, |
|
ollama_response: str) -> AsyncGenerator[Dict, None]: |
|
"""Coordinate with HF endpoint as authoritative layer with streaming""" |
|
try: |
|
|
|
hf_status = hf_monitor.check_endpoint_status() |
|
|
|
if not hf_status['available']: |
|
yield {'type': 'coordination_status', 'content': '⚡ Initializing HF endpoint (2-4 minutes)...'} |
|
warmup_success = hf_monitor.handle_scale_to_zero() |
|
if not warmup_success: |
|
yield {'type': 'coordination_status', 'content': '❌ HF endpoint initialization failed'} |
|
return |
|
|
|
|
|
hf_provider = llm_factory.get_provider('huggingface') |
|
if not hf_provider: |
|
yield {'type': 'coordination_status', 'content': '❌ HF provider not available'} |
|
return |
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
enhanced_history.insert(0, { |
|
"role": "system", |
|
"content": self.system_instructions['hf_role'] |
|
}) |
|
|
|
|
|
enhanced_history.append({ |
|
"role": "system", |
|
"content": f"""Context for your authoritative analysis: |
|
User Query: '{query}' |
|
Ollama's Initial Response: '{ollama_response}' |
|
External Data Available: {json.dumps(external_data, indent=2) if external_data else 'None'} |
|
|
|
Please provide your deep, uncensored analysis that supersedes the local model's response. |
|
Use your full 8192 token capacity for comprehensive thinking. |
|
Stream your response for real-time delivery.""" |
|
}) |
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
yield {'type': 'coordination_status', 'content': '🧠 HF endpoint thinking...'} |
|
|
|
|
|
hf_response_stream = hf_provider.stream_generate(query, enhanced_history) |
|
|
|
if hf_response_stream: |
|
|
|
full_hf_response = "" |
|
for chunk in hf_response_stream: |
|
if chunk: |
|
full_hf_response += chunk |
|
yield {'type': 'hf_thinking', 'content': chunk} |
|
|
|
|
|
yield {'type': 'final_response', 'content': full_hf_response} |
|
yield {'type': 'coordination_status', 'content': '🎯 HF analysis complete and authoritative'} |
|
else: |
|
yield {'type': 'coordination_status', 'content': '❌ HF response generation failed'} |
|
|
|
except Exception as e: |
|
logger.error(f"Hierarchical HF coordination failed: {e}") |
|
yield {'type': 'coordination_status', 'content': f'❌ HF coordination error: {str(e)}'} |
|
|
|
async def _get_hierarchical_ollama_response(self, query: str, history: List, external_data: Dict) -> str: |
|
"""Get Ollama response with hierarchical awareness""" |
|
try: |
|
|
|
ollama_provider = llm_factory.get_provider('ollama') |
|
if not ollama_provider: |
|
raise Exception("Ollama provider not available") |
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
enhanced_history.insert(0, { |
|
"role": "system", |
|
"content": self.system_instructions['ollama_role'] |
|
}) |
|
|
|
|
|
if external_data: |
|
context_parts = [] |
|
if 'search_answer' in external_data: |
|
context_parts.append(f"Current information: {external_data['search_answer']}") |
|
if 'weather' in external_data: |
|
weather = external_data['weather'] |
|
context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}°C in {weather.get('city', 'Unknown')}") |
|
if 'current_datetime' in external_data: |
|
context_parts.append(f"Current time: {external_data['current_datetime']}") |
|
|
|
if context_parts: |
|
context_message = { |
|
"role": "system", |
|
"content": "Context: " + " | ".join(context_parts) |
|
} |
|
enhanced_history.insert(1, context_message) |
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
response = ollama_provider.generate(query, enhanced_history) |
|
|
|
|
|
if response: |
|
return f"{response}\n\n*Note: A more comprehensive analysis from the uncensored HF model is being prepared...*" |
|
else: |
|
return "I'm processing your request... A deeper analysis is being prepared by the authoritative model." |
|
|
|
except Exception as e: |
|
logger.error(f"Hierarchical Ollama response failed: {e}") |
|
return "I'm thinking about your question... Preparing a comprehensive response." |
|
|
|
def _check_hf_availability(self) -> bool: |
|
"""Check if HF endpoint is configured and available""" |
|
try: |
|
from utils.config import config |
|
return bool(config.hf_token and config.hf_api_url) |
|
except: |
|
return False |
|
|
|
async def _gather_external_data(self, query: str) -> Dict: |
|
"""Gather external data from various sources""" |
|
data = {} |
|
|
|
|
|
if self.tavily_client: |
|
try: |
|
search_result = self.tavily_client.search( |
|
f"current information about {query}", |
|
max_results=5, |
|
include_answer=True, |
|
include_raw_content=True |
|
) |
|
data['search_results'] = search_result.get('results', []) |
|
if search_result.get('answer'): |
|
data['search_answer'] = search_result['answer'] |
|
|
|
data['raw_sources'] = [result.get('raw_content', '')[:1000] for result in search_result.get('results', [])[:3]] |
|
except Exception as e: |
|
logger.warning(f"Tavily search failed: {e}") |
|
|
|
|
|
weather_keywords = ['weather', 'temperature', 'forecast', 'climate', 'rain', 'sunny'] |
|
if any(keyword in query.lower() for keyword in weather_keywords): |
|
try: |
|
location = self._extract_location(query) or "New York" |
|
weather = weather_service.get_current_weather(location) |
|
if weather: |
|
data['weather'] = weather |
|
except Exception as e: |
|
logger.warning(f"Weather data failed: {e}") |
|
|
|
|
|
data['current_datetime'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
return data |
|
|
|
def _extract_location(self, query: str) -> Optional[str]: |
|
"""Extract location from query""" |
|
locations = ['New York', 'London', 'Tokyo', 'Paris', 'Berlin', 'Sydney', |
|
'Los Angeles', 'Chicago', 'Miami', 'Seattle', 'Boston', |
|
'San Francisco', 'Toronto', 'Vancouver', 'Montreal'] |
|
for loc in locations: |
|
if loc.lower() in query.lower(): |
|
return loc |
|
return "New York" |
|
|
|
def get_coordination_status(self) -> Dict: |
|
"""Get current coordination system status""" |
|
return { |
|
'tavily_available': self.tavily_client is not None, |
|
'weather_available': weather_service.api_key is not None, |
|
'web_search_enabled': self.tavily_client is not None, |
|
'external_apis_configured': any([ |
|
weather_service.api_key, |
|
os.getenv("TAVILY_API_KEY"), |
|
os.getenv("NASA_API_KEY") |
|
]) |
|
} |
|
|
|
def get_recent_activities(self, user_id: str) -> Dict: |
|
"""Get recent coordination activities for user""" |
|
try: |
|
session = session_manager.get_session(user_id) |
|
coord_stats = session.get('ai_coordination', {}) |
|
return { |
|
'last_request': coord_stats.get('last_coordination'), |
|
'requests_processed': coord_stats.get('requests_processed', 0), |
|
'ollama_responses': coord_stats.get('ollama_responses', 0), |
|
'hf_responses': coord_stats.get('hf_responses', 0) |
|
} |
|
except: |
|
return {} |
|
|
|
|
|
coordinator = AICoordinator() |
|
|