File size: 6,903 Bytes
084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a 8cfe660 084503a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import requests
import time
import logging
from typing import Dict
from utils.config import config
logger = logging.getLogger(__name__)
class HFEndpointMonitor:
"""Monitor Hugging Face endpoint status and health"""
def __init__(self):
self.endpoint_url = config.hf_api_url.rstrip('/') if config.hf_api_url else ""
self.hf_token = config.hf_token
self.last_check = 0
self.check_interval = 300 # 5 minutes
self._cached_status = None
self._last_detailed_check = 0
def get_endpoint_status(self) -> Dict:
"""Get current HF endpoint status"""
current_time = time.time()
# Return cached status if checked recently (but still do detailed check occasionally)
if (self._cached_status and
current_time - self.last_check < 60):
# Do a quick check but allow detailed check occasionally
if current_time - self._last_detailed_check < 300: # 5 minutes
return self._cached_status
self.last_check = current_time
# Check if configured
if not self.endpoint_url or not self.hf_token:
status = {
"status": "not_configured",
"message": "HF endpoint not configured",
"available": False,
"initializing": False,
"detailed": False
}
self._cached_status = status
return status
try:
# Check endpoint status with short timeout for quick response
headers = {"Authorization": f"Bearer {self.hf_token}"}
models_url = f"{self.endpoint_url}/models"
response = requests.get(
models_url,
headers=headers,
timeout=10 # Short timeout for quick response
)
if response.status_code in [200, 201]:
status = {
"status": "available",
"message": "HF endpoint is ready",
"available": True,
"initializing": False,
"detailed": True
}
elif response.status_code == 503:
status = {
"status": "scaled_to_zero",
"message": "HF endpoint is scaled to zero",
"available": False,
"initializing": False,
"detailed": True
}
else:
status = {
"status": "error",
"message": f"HF endpoint error: {response.status_code}",
"available": False,
"initializing": False,
"detailed": True
}
except requests.exceptions.Timeout:
status = {
"status": "timeout",
"message": "HF endpoint timeout (may be initializing)",
"available": False,
"initializing": True,
"detailed": True
}
except Exception as e:
status = {
"status": "error",
"message": f"HF endpoint error: {str(e)}",
"available": False,
"initializing": False,
"detailed": True
}
self._cached_status = status
self._last_detailed_check = current_time
return status
def get_human_readable_status(self) -> str:
"""Get human-readable status message"""
status = self.get_endpoint_status()
# Check if we're looking at an initializing replica from the logs
if "initializing" in status.get("message", "").lower():
return "⏳ HF Endpoint: Initializing replica (started Sep 09, 22:15:24)"
status_messages = {
"not_configured": "🟡 HF Endpoint: Not configured",
"available": "🟢 HF Endpoint: Available and ready",
"scaled_to_zero": "🔴 HF Endpoint: Scaled to zero (send message to wake up)",
"timeout": "⏳ HF Endpoint: Initializing (may take 4 minutes)",
"error": f"❌ HF Endpoint: Error - {status.get('message', 'Unknown error')}"
}
return status_messages.get(status["status"], f"⚪ HF Endpoint: {status.get('message', 'Unknown status')}")
def get_detailed_status(self) -> Dict:
"""Get detailed status information"""
status = self.get_endpoint_status()
# Add additional context from logs
if "initializing" in status.get("message", "").lower():
status.update({
"details": "Replica UIVI6 downloading - Started Sep 09, 22:15:24",
"eta": "Initialization may take 2-4 minutes",
"action": "Please wait for initialization to complete"
})
return status
def attempt_wake_up(self) -> bool:
"""Attempt to wake up the HF endpoint"""
if not self.endpoint_url or not self.hf_token:
return False
try:
headers = {
"Authorization": f"Bearer {self.hf_token}",
"Content-Type": "application/json"
}
# Send a minimal request to wake up the endpoint
payload = {
"model": "DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf",
"messages": [{"role": "user", "content": "Hello"}],
"max_tokens": 10,
"stream": False
}
chat_url = f"{self.endpoint_url}/chat/completions"
# Longer timeout for wake-up
response = requests.post(
chat_url,
headers=headers,
json=payload,
timeout=60 # Longer timeout for wake-up
)
return response.status_code in [200, 201]
except Exception as e:
logger.warning(f"Failed to wake up HF endpoint: {e}")
return False
def get_initialization_progress(self) -> str:
"""Get initialization progress information"""
status = self.get_endpoint_status()
if "initializing" in status.get("message", "").lower():
return """
🚀 HF Endpoint Initialization in Progress:
- Replica: UIVI6 downloading
- Started: Sep 09, 22:15:24
- Status: Logs not yet available
- ETA: 2-4 minutes
Please wait for initialization to complete before using the endpoint.
"""
return ""
# Global instance
hf_monitor = HFEndpointMonitor()
|