import requests import time import logging from typing import Dict, Optional from utils.config import config logger = logging.getLogger(__name__) class HFEndpointMonitor: """Monitor Hugging Face endpoint status and health""" def __init__(self): # Clean the endpoint URL raw_url = config.hf_api_url or "" self.endpoint_url = self._clean_endpoint_url(raw_url) self.hf_token = config.hf_token self.is_initialized = False self.last_check = 0 self.check_interval = 300 # Increase from 60 to 300 seconds (5 minutes) self.warmup_attempts = 0 self.max_warmup_attempts = 3 self.warmup_count = 0 self.successful_requests = 0 self.failed_requests = 0 self.avg_response_time = 0 logger.info(f"Initialized HF Monitor with URL: {self.endpoint_url}") def _clean_endpoint_url(self, url: str) -> str: """Clean and validate endpoint URL""" if not url: return "" # Remove environment variable names if present url = url.replace('hf_api_endpoint_url=', '') url = url.replace('HF_API_ENDPOINT_URL=', '') # Strip whitespace url = url.strip() # Ensure it starts with https:// if url and not url.startswith(('http://', 'https://')): if 'huggingface.cloud' in url: url = 'https://' + url else: url = 'https://' + url # Remove trailing slashes but keep /v1 if present if url.endswith('/'): url = url.rstrip('/') return url def check_endpoint_status(self) -> Dict: """Check if HF endpoint is available and initialized with rate limiting""" current_time = time.time() # Don't check too frequently - minimum 1 minute between checks if current_time - self.last_check < 60: # Return cached status or basic status return { 'available': getattr(self, '_last_available', False), 'status_code': getattr(self, '_last_status_code', None), 'initialized': getattr(self, '_last_initialized', False), 'timestamp': self.last_check } # Proceed with actual check self.last_check = current_time try: if not self.endpoint_url or not self.hf_token: status_info = { 'available': False, 'status_code': None, 'initialized': False, 'error': 'URL or token not configured', 'timestamp': time.time() } else: # Properly construct the models endpoint URL models_url = f"{self.endpoint_url.rstrip('/')}/models" logger.info(f"Checking HF endpoint at: {models_url}") headers = {"Authorization": f"Bearer {self.hf_token}"} response = requests.get( models_url, headers=headers, timeout=15 ) status_info = { 'available': response.status_code in [200, 201], 'status_code': response.status_code, 'initialized': self._is_endpoint_initialized(response), 'response_time': response.elapsed.total_seconds(), 'timestamp': time.time() } if response.status_code not in [200, 201]: status_info['error'] = f"HTTP {response.status_code}: {response.text[:200]}" logger.info(f"HF Endpoint Status: {status_info}") # Cache the results self._last_available = status_info['available'] self._last_status_code = status_info['status_code'] self._last_initialized = status_info.get('initialized', False) return status_info except Exception as e: error_msg = str(e) logger.error(f"HF endpoint check failed: {error_msg}") status_info = { 'available': False, 'status_code': None, 'initialized': False, 'error': error_msg, 'timestamp': time.time() } # Cache the results self._last_available = False self._last_status_code = None self._last_initialized = False return status_info def _is_endpoint_initialized(self, response) -> bool: """Determine if endpoint is fully initialized""" try: data = response.json() return 'data' in data or 'models' in data except: return response.status_code in [200, 201] def warm_up_endpoint(self) -> bool: """Send a warm-up request to initialize the endpoint""" try: if not self.endpoint_url or not self.hf_token: logger.warning("Cannot warm up HF endpoint - URL or token not configured") return False self.warmup_attempts += 1 logger.info(f"Warming up HF endpoint (attempt {self.warmup_attempts})...") headers = { "Authorization": f"Bearer {self.hf_token}", "Content-Type": "application/json" } # Construct proper chat completions URL chat_url = f"{self.endpoint_url.rstrip('/')}/chat/completions" logger.info(f"Sending warm-up request to: {chat_url}") payload = { "model": "DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf", "messages": [{"role": "user", "content": "Hello"}], "max_tokens": 10, "stream": False } response = requests.post( chat_url, headers=headers, json=payload, timeout=45 # Longer timeout for cold start ) success = response.status_code in [200, 201] if success: self.is_initialized = True self.warmup_count += 1 self.warmup_attempts = 0 # Reset on success logger.info("✅ HF endpoint warmed up successfully") else: logger.warning(f"⚠️ HF endpoint warm-up response: {response.status_code}") logger.debug(f"Response body: {response.text[:500]}") return success except Exception as e: logger.error(f"HF endpoint warm-up failed: {e}") self.failed_requests += 1 return False def get_status_summary(self) -> str: """Get human-readable status summary""" status = self.check_endpoint_status() if status['available']: if status.get('initialized', False): return "🟢 HF Endpoint: Available and Initialized" else: return "🟡 HF Endpoint: Available but Initializing" else: return "🔴 HF Endpoint: Unavailable" def handle_scale_to_zero(self) -> bool: """Handle scale-to-zero behavior with user feedback""" logger.info("HF endpoint appears to be scaled to zero. Attempting to wake it up...") # Try to warm up the endpoint for attempt in range(self.max_warmup_attempts): logger.info(f"Wake-up attempt {attempt + 1}/{self.max_warmup_attempts}") if self.warm_up_endpoint(): logger.info("✅ HF endpoint successfully woken up!") return True time.sleep(10) # Wait between attempts logger.error("❌ Failed to wake up HF endpoint after all attempts") return False def get_detailed_status(self) -> Dict: """Get detailed HF endpoint status with metrics""" try: headers = {"Authorization": f"Bearer {self.hf_token}"} # Get model info models_url = f"{self.endpoint_url.rstrip('/')}/models" model_response = requests.get( models_url, headers=headers, timeout=10 ) # Get endpoint info if available endpoint_info = {} try: info_url = f"{self.endpoint_url.rstrip('/')}/info" info_response = requests.get( info_url, headers=headers, timeout=10 ) if info_response.status_code == 200: endpoint_info = info_response.json() except: pass status_info = { 'available': model_response.status_code == 200, 'status_code': model_response.status_code, 'initialized': self._is_endpoint_initialized(model_response), 'endpoint_info': endpoint_info, 'last_checked': time.time(), 'warmup_attempts': getattr(self, 'warmup_attempts', 0), 'is_warming_up': getattr(self, 'is_warming_up', False) } return status_info except Exception as e: return { 'available': False, 'status_code': None, 'initialized': False, 'error': str(e), 'last_checked': time.time() } def get_performance_metrics(self) -> Dict: """Get HF endpoint performance metrics""" return { 'warmup_count': getattr(self, 'warmup_count', 0), 'successful_requests': getattr(self, 'successful_requests', 0), 'failed_requests': getattr(self, 'failed_requests', 0), 'average_response_time': getattr(self, 'avg_response_time', 0) } # Add enhanced status tracking methods def get_enhanced_status(self) -> Dict: """Get enhanced HF endpoint status with engagement tracking""" basic_status = self.check_endpoint_status() return { **basic_status, "engagement_level": self._determine_engagement_level(), "last_engagement": getattr(self, '_last_engagement_time', None), "total_engagements": getattr(self, '_total_engagements', 0), "current_research_topic": getattr(self, '_current_research_topic', None) } def _determine_engagement_level(self) -> str: """Determine current engagement level""" if not self.is_initialized: return "idle" elif getattr(self, '_currently_analyzing', False): return "analyzing" elif getattr(self, '_pending_research', False): return "research_pending" else: return "ready" def start_hf_analysis(self, topic: str = None): """Start HF analysis with topic tracking""" self._currently_analyzing = True self._last_engagement_time = time.time() self._total_engagements = getattr(self, '_total_engagements', 0) + 1 if topic: self._current_research_topic = topic def finish_hf_analysis(self): """Finish HF analysis""" self._currently_analyzing = False self._current_research_topic = None # Global instance hf_monitor = HFEndpointMonitor()