Spaces:
Running
Running
File size: 6,747 Bytes
001a1f0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import redis
import os
import time
from datetime import datetime, timedelta
class ServerMonitor:
def __init__(self):
try:
self.redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
username=os.getenv("REDIS_USERNAME"),
password=os.getenv("REDIS_PASSWORD"),
decode_responses=True
)
# Test connection
self.redis_client.ping()
self.connected = True
except Exception:
self.redis_client = None
self.connected = False
def report_failure(self):
"""Report a server failure (e.g., 503 error)"""
if not self.connected:
return
try:
# Increment failure counter
key = f"server_failures:{datetime.now().strftime('%Y-%m-%d:%H')}"
self.redis_client.incr(key)
self.redis_client.expire(key, 3600) # Expire in 1 hour
# Record last failure time
self.redis_client.set("last_failure", datetime.now().isoformat())
self.redis_client.expire("last_failure", 86400) # Expire in 24 hours
except Exception:
pass # Silently fail to avoid breaking the main app
def report_success(self):
"""Report a successful request"""
if not self.connected:
return
try:
# Reset failure counter for current hour
key = f"server_failures:{datetime.now().strftime('%Y-%m-%d:%H')}"
self.redis_client.delete(key)
# Record last success time
self.redis_client.set("last_success", datetime.now().isoformat())
self.redis_client.expire("last_success", 86400) # Expire in 24 hours
except Exception:
pass # Silently fail to avoid breaking the main app
def check_server_status(self):
"""Check if server is likely available based on recent activity"""
if not self.connected:
return {"available": True, "message": "Redis not configured, assuming server available"}
try:
# Get recent failures
now = datetime.now()
failures_last_hour = 0
# Check current and previous hour
for i in range(2):
check_time = now - timedelta(hours=i)
key = f"server_failures:{check_time.strftime('%Y-%m-%d:%H')}"
failures = self.redis_client.get(key)
if failures:
failures_last_hour += int(failures)
# Get last failure time
last_failure_str = self.redis_client.get("last_failure")
last_success_str = self.redis_client.get("last_success")
# If we had recent failures but no recent success, server might be down
if failures_last_hour > 3:
if last_success_str:
last_success = datetime.fromisoformat(last_success_str)
minutes_since_success = (now - last_success).total_seconds() / 60
if minutes_since_success < 15:
return {
"available": True,
"message": "Recent success detected, server likely available",
"estimated_wait": 0
}
# Estimate wait time based on typical warmup
return {
"available": False,
"message": f"High failure rate detected ({failures_last_hour} failures recently)",
"estimated_wait": 5
}
# If we had a very recent failure (< 5 mins), suggest waiting
if last_failure_str:
last_failure = datetime.fromisoformat(last_failure_str)
minutes_since_failure = (now - last_failure).total_seconds() / 60
if minutes_since_failure < 5:
return {
"available": False,
"message": f"Recent failure {int(minutes_since_failure)} minutes ago",
"estimated_wait": max(1, 5 - int(minutes_since_failure))
}
return {
"available": True,
"message": "Server appears to be available",
"estimated_wait": 0
}
except Exception as e:
# On any Redis error, assume server is available
return {
"available": True,
"message": f"Monitoring check failed: {str(e)}, assuming server available",
"estimated_wait": 0
}
def get_system_stats(self):
"""Get detailed system statistics"""
if not self.connected:
return {"error": "Redis not configured"}
try:
stats = {}
# Get recent failures
now = datetime.now()
total_failures = 0
for i in range(24): # Last 24 hours
check_time = now - timedelta(hours=i)
key = f"server_failures:{check_time.strftime('%Y-%m-%d:%H')}"
failures = self.redis_client.get(key)
if failures:
total_failures += int(failures)
stats["failures_last_24h"] = total_failures
# Get last events
last_failure = self.redis_client.get("last_failure")
last_success = self.redis_client.get("last_success")
stats["last_failure"] = last_failure if last_failure else "None recorded"
stats["last_success"] = last_success if last_success else "None recorded"
# Calculate uptime percentage (approximate)
if last_failure and last_success:
failure_time = datetime.fromisoformat(last_failure)
success_time = datetime.fromisoformat(last_success)
if success_time > failure_time:
stats["status"] = "Operational"
else:
stats["status"] = "Degraded"
elif last_success:
stats["status"] = "Operational"
elif last_failure:
stats["status"] = "Issues Detected"
else:
stats["status"] = "Unknown"
return stats
except Exception as e:
return {"error": str(e)}
|