|
import requests |
|
import time |
|
import logging |
|
from typing import Dict, Optional |
|
from utils.config import config |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class HFEndpointMonitor: |
|
"""Monitor Hugging Face endpoint status and health""" |
|
|
|
def __init__(self): |
|
|
|
raw_url = config.hf_api_url or "" |
|
self.endpoint_url = self._clean_endpoint_url(raw_url) |
|
self.hf_token = config.hf_token |
|
self.is_initialized = False |
|
self.last_check = 0 |
|
self.check_interval = 300 |
|
self.warmup_attempts = 0 |
|
self.max_warmup_attempts = 3 |
|
self.warmup_count = 0 |
|
self.successful_requests = 0 |
|
self.failed_requests = 0 |
|
self.avg_response_time = 0 |
|
|
|
logger.info(f"Initialized HF Monitor with URL: {self.endpoint_url}") |
|
|
|
def _clean_endpoint_url(self, url: str) -> str: |
|
"""Clean and validate endpoint URL""" |
|
if not url: |
|
return "" |
|
|
|
|
|
url = url.replace('hf_api_endpoint_url=', '') |
|
url = url.replace('HF_API_ENDPOINT_URL=', '') |
|
|
|
|
|
url = url.strip() |
|
|
|
|
|
if url and not url.startswith(('http://', 'https://')): |
|
if 'huggingface.cloud' in url: |
|
url = 'https://' + url |
|
else: |
|
url = 'https://' + url |
|
|
|
|
|
if url.endswith('/'): |
|
url = url.rstrip('/') |
|
|
|
return url |
|
|
|
def check_endpoint_status(self) -> Dict: |
|
"""Check if HF endpoint is available and initialized with rate limiting""" |
|
current_time = time.time() |
|
|
|
|
|
if current_time - self.last_check < 60: |
|
|
|
return { |
|
'available': getattr(self, '_last_available', False), |
|
'status_code': getattr(self, '_last_status_code', None), |
|
'initialized': getattr(self, '_last_initialized', False), |
|
'timestamp': self.last_check |
|
} |
|
|
|
|
|
self.last_check = current_time |
|
|
|
try: |
|
if not self.endpoint_url or not self.hf_token: |
|
status_info = { |
|
'available': False, |
|
'status_code': None, |
|
'initialized': False, |
|
'error': 'URL or token not configured', |
|
'timestamp': time.time() |
|
} |
|
else: |
|
|
|
models_url = f"{self.endpoint_url.rstrip('/')}/models" |
|
logger.info(f"Checking HF endpoint at: {models_url}") |
|
|
|
headers = {"Authorization": f"Bearer {self.hf_token}"} |
|
|
|
response = requests.get( |
|
models_url, |
|
headers=headers, |
|
timeout=15 |
|
) |
|
|
|
status_info = { |
|
'available': response.status_code in [200, 201], |
|
'status_code': response.status_code, |
|
'initialized': self._is_endpoint_initialized(response), |
|
'response_time': response.elapsed.total_seconds(), |
|
'timestamp': time.time() |
|
} |
|
|
|
if response.status_code not in [200, 201]: |
|
status_info['error'] = f"HTTP {response.status_code}: {response.text[:200]}" |
|
|
|
logger.info(f"HF Endpoint Status: {status_info}") |
|
|
|
|
|
self._last_available = status_info['available'] |
|
self._last_status_code = status_info['status_code'] |
|
self._last_initialized = status_info.get('initialized', False) |
|
|
|
return status_info |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
logger.error(f"HF endpoint check failed: {error_msg}") |
|
|
|
status_info = { |
|
'available': False, |
|
'status_code': None, |
|
'initialized': False, |
|
'error': error_msg, |
|
'timestamp': time.time() |
|
} |
|
|
|
|
|
self._last_available = False |
|
self._last_status_code = None |
|
self._last_initialized = False |
|
|
|
return status_info |
|
|
|
def _is_endpoint_initialized(self, response) -> bool: |
|
"""Determine if endpoint is fully initialized""" |
|
try: |
|
data = response.json() |
|
return 'data' in data or 'models' in data |
|
except: |
|
return response.status_code in [200, 201] |
|
|
|
def warm_up_endpoint(self) -> bool: |
|
"""Send a warm-up request to initialize the endpoint""" |
|
try: |
|
if not self.endpoint_url or not self.hf_token: |
|
logger.warning("Cannot warm up HF endpoint - URL or token not configured") |
|
return False |
|
|
|
self.warmup_attempts += 1 |
|
logger.info(f"Warming up HF endpoint (attempt {self.warmup_attempts})...") |
|
|
|
headers = { |
|
"Authorization": f"Bearer {self.hf_token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
|
|
chat_url = f"{self.endpoint_url.rstrip('/')}/chat/completions" |
|
logger.info(f"Sending warm-up request to: {chat_url}") |
|
|
|
payload = { |
|
"model": "DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf", |
|
"messages": [{"role": "user", "content": "Hello"}], |
|
"max_tokens": 10, |
|
"stream": False |
|
} |
|
|
|
response = requests.post( |
|
chat_url, |
|
headers=headers, |
|
json=payload, |
|
timeout=45 |
|
) |
|
|
|
success = response.status_code in [200, 201] |
|
if success: |
|
self.is_initialized = True |
|
self.warmup_count += 1 |
|
self.warmup_attempts = 0 |
|
logger.info("β
HF endpoint warmed up successfully") |
|
else: |
|
logger.warning(f"β οΈ HF endpoint warm-up response: {response.status_code}") |
|
logger.debug(f"Response body: {response.text[:500]}") |
|
|
|
return success |
|
|
|
except Exception as e: |
|
logger.error(f"HF endpoint warm-up failed: {e}") |
|
self.failed_requests += 1 |
|
return False |
|
|
|
def get_status_summary(self) -> str: |
|
"""Get human-readable status summary""" |
|
status = self.check_endpoint_status() |
|
if status['available']: |
|
if status.get('initialized', False): |
|
return "π’ HF Endpoint: Available and Initialized" |
|
else: |
|
return "π‘ HF Endpoint: Available but Initializing" |
|
else: |
|
return "π΄ HF Endpoint: Unavailable" |
|
|
|
def handle_scale_to_zero(self) -> bool: |
|
"""Handle scale-to-zero behavior with user feedback""" |
|
logger.info("HF endpoint appears to be scaled to zero. Attempting to wake it up...") |
|
|
|
|
|
for attempt in range(self.max_warmup_attempts): |
|
logger.info(f"Wake-up attempt {attempt + 1}/{self.max_warmup_attempts}") |
|
if self.warm_up_endpoint(): |
|
logger.info("β
HF endpoint successfully woken up!") |
|
return True |
|
time.sleep(10) |
|
|
|
logger.error("β Failed to wake up HF endpoint after all attempts") |
|
return False |
|
|
|
def get_detailed_status(self) -> Dict: |
|
"""Get detailed HF endpoint status with metrics""" |
|
try: |
|
headers = {"Authorization": f"Bearer {self.hf_token}"} |
|
|
|
|
|
models_url = f"{self.endpoint_url.rstrip('/')}/models" |
|
model_response = requests.get( |
|
models_url, |
|
headers=headers, |
|
timeout=10 |
|
) |
|
|
|
|
|
endpoint_info = {} |
|
try: |
|
info_url = f"{self.endpoint_url.rstrip('/')}/info" |
|
info_response = requests.get( |
|
info_url, |
|
headers=headers, |
|
timeout=10 |
|
) |
|
if info_response.status_code == 200: |
|
endpoint_info = info_response.json() |
|
except: |
|
pass |
|
|
|
status_info = { |
|
'available': model_response.status_code == 200, |
|
'status_code': model_response.status_code, |
|
'initialized': self._is_endpoint_initialized(model_response), |
|
'endpoint_info': endpoint_info, |
|
'last_checked': time.time(), |
|
'warmup_attempts': getattr(self, 'warmup_attempts', 0), |
|
'is_warming_up': getattr(self, 'is_warming_up', False) |
|
} |
|
|
|
return status_info |
|
|
|
except Exception as e: |
|
return { |
|
'available': False, |
|
'status_code': None, |
|
'initialized': False, |
|
'error': str(e), |
|
'last_checked': time.time() |
|
} |
|
|
|
def get_performance_metrics(self) -> Dict: |
|
"""Get HF endpoint performance metrics""" |
|
return { |
|
'warmup_count': getattr(self, 'warmup_count', 0), |
|
'successful_requests': getattr(self, 'successful_requests', 0), |
|
'failed_requests': getattr(self, 'failed_requests', 0), |
|
'average_response_time': getattr(self, 'avg_response_time', 0) |
|
} |
|
|
|
|
|
def get_enhanced_status(self) -> Dict: |
|
"""Get enhanced HF endpoint status with engagement tracking""" |
|
basic_status = self.check_endpoint_status() |
|
|
|
return { |
|
**basic_status, |
|
"engagement_level": self._determine_engagement_level(), |
|
"last_engagement": getattr(self, '_last_engagement_time', None), |
|
"total_engagements": getattr(self, '_total_engagements', 0), |
|
"current_research_topic": getattr(self, '_current_research_topic', None) |
|
} |
|
|
|
def _determine_engagement_level(self) -> str: |
|
"""Determine current engagement level""" |
|
if not self.is_initialized: |
|
return "idle" |
|
elif getattr(self, '_currently_analyzing', False): |
|
return "analyzing" |
|
elif getattr(self, '_pending_research', False): |
|
return "research_pending" |
|
else: |
|
return "ready" |
|
|
|
def start_hf_analysis(self, topic: str = None): |
|
"""Start HF analysis with topic tracking""" |
|
self._currently_analyzing = True |
|
self._last_engagement_time = time.time() |
|
self._total_engagements = getattr(self, '_total_engagements', 0) + 1 |
|
if topic: |
|
self._current_research_topic = topic |
|
|
|
def finish_hf_analysis(self): |
|
"""Finish HF analysis""" |
|
self._currently_analyzing = False |
|
self._current_research_topic = None |
|
|
|
|
|
hf_monitor = HFEndpointMonitor() |
|
|