Spaces:
Running
Running
import redis | |
import os | |
import time | |
from datetime import datetime, timedelta | |
class ServerMonitor: | |
def __init__(self): | |
try: | |
self.redis_client = redis.Redis( | |
host=os.getenv("REDIS_HOST", "localhost"), | |
port=int(os.getenv("REDIS_PORT", 6379)), | |
username=os.getenv("REDIS_USERNAME"), | |
password=os.getenv("REDIS_PASSWORD"), | |
decode_responses=True | |
) | |
# Test connection | |
self.redis_client.ping() | |
self.connected = True | |
except Exception: | |
self.redis_client = None | |
self.connected = False | |
def report_failure(self): | |
"""Report a server failure (e.g., 503 error)""" | |
if not self.connected: | |
return | |
try: | |
# Increment failure counter | |
key = f"server_failures:{datetime.now().strftime('%Y-%m-%d:%H')}" | |
self.redis_client.incr(key) | |
self.redis_client.expire(key, 3600) # Expire in 1 hour | |
# Record last failure time | |
self.redis_client.set("last_failure", datetime.now().isoformat()) | |
self.redis_client.expire("last_failure", 86400) # Expire in 24 hours | |
except Exception: | |
pass # Silently fail to avoid breaking the main app | |
def report_success(self): | |
"""Report a successful request""" | |
if not self.connected: | |
return | |
try: | |
# Reset failure counter for current hour | |
key = f"server_failures:{datetime.now().strftime('%Y-%m-%d:%H')}" | |
self.redis_client.delete(key) | |
# Record last success time | |
self.redis_client.set("last_success", datetime.now().isoformat()) | |
self.redis_client.expire("last_success", 86400) # Expire in 24 hours | |
except Exception: | |
pass # Silently fail to avoid breaking the main app | |
def check_server_status(self): | |
"""Check if server is likely available based on recent activity""" | |
if not self.connected: | |
return {"available": True, "message": "Redis not configured, assuming server available"} | |
try: | |
# Get recent failures | |
now = datetime.now() | |
failures_last_hour = 0 | |
# Check current and previous hour | |
for i in range(2): | |
check_time = now - timedelta(hours=i) | |
key = f"server_failures:{check_time.strftime('%Y-%m-%d:%H')}" | |
failures = self.redis_client.get(key) | |
if failures: | |
failures_last_hour += int(failures) | |
# Get last failure time | |
last_failure_str = self.redis_client.get("last_failure") | |
last_success_str = self.redis_client.get("last_success") | |
# If we had recent failures but no recent success, server might be down | |
if failures_last_hour > 3: | |
if last_success_str: | |
last_success = datetime.fromisoformat(last_success_str) | |
minutes_since_success = (now - last_success).total_seconds() / 60 | |
if minutes_since_success < 15: | |
return { | |
"available": True, | |
"message": "Recent success detected, server likely available", | |
"estimated_wait": 0 | |
} | |
# Estimate wait time based on typical warmup | |
return { | |
"available": False, | |
"message": f"High failure rate detected ({failures_last_hour} failures recently)", | |
"estimated_wait": 5 | |
} | |
# If we had a very recent failure (< 5 mins), suggest waiting | |
if last_failure_str: | |
last_failure = datetime.fromisoformat(last_failure_str) | |
minutes_since_failure = (now - last_failure).total_seconds() / 60 | |
if minutes_since_failure < 5: | |
return { | |
"available": False, | |
"message": f"Recent failure {int(minutes_since_failure)} minutes ago", | |
"estimated_wait": max(1, 5 - int(minutes_since_failure)) | |
} | |
return { | |
"available": True, | |
"message": "Server appears to be available", | |
"estimated_wait": 0 | |
} | |
except Exception as e: | |
# On any Redis error, assume server is available | |
return { | |
"available": True, | |
"message": f"Monitoring check failed: {str(e)}, assuming server available", | |
"estimated_wait": 0 | |
} | |
def get_system_stats(self): | |
"""Get detailed system statistics""" | |
if not self.connected: | |
return {"error": "Redis not configured"} | |
try: | |
stats = {} | |
# Get recent failures | |
now = datetime.now() | |
total_failures = 0 | |
for i in range(24): # Last 24 hours | |
check_time = now - timedelta(hours=i) | |
key = f"server_failures:{check_time.strftime('%Y-%m-%d:%H')}" | |
failures = self.redis_client.get(key) | |
if failures: | |
total_failures += int(failures) | |
stats["failures_last_24h"] = total_failures | |
# Get last events | |
last_failure = self.redis_client.get("last_failure") | |
last_success = self.redis_client.get("last_success") | |
stats["last_failure"] = last_failure if last_failure else "None recorded" | |
stats["last_success"] = last_success if last_success else "None recorded" | |
# Calculate uptime percentage (approximate) | |
if last_failure and last_success: | |
failure_time = datetime.fromisoformat(last_failure) | |
success_time = datetime.fromisoformat(last_success) | |
if success_time > failure_time: | |
stats["status"] = "Operational" | |
else: | |
stats["status"] = "Degraded" | |
elif last_success: | |
stats["status"] = "Operational" | |
elif last_failure: | |
stats["status"] = "Issues Detected" | |
else: | |
stats["status"] = "Unknown" | |
return stats | |
except Exception as e: | |
return {"error": str(e)} | |