Spaces:
Sleeping
Sleeping
import requests | |
import time | |
import threading | |
from flask import Flask | |
from datetime import datetime | |
# ==== CONFIG ==== | |
BOT_TOKEN = "7316836080:AAEv7wRHNmgCb707ppG6rpTV0mhEBC4n26A" | |
CHAT_ID = -1002038741539 # Group chat ID | |
API_URL = "https://raw.githubusercontent.com/n-ce/Uma/main/dynamic_instances.json" | |
CHECK_INTERVAL = 10 # seconds | |
# Simple health messages | |
STATUS_MESSAGES = { | |
"U": "U => Health is good, playback can run optimally", | |
"P": "P => Health is fair enough, playback can have risks", | |
"I": "I => Health is fair enough, playback can have risks", | |
"N": "N => Health is compromised, Playback may take time or errors may occur." | |
} | |
last_health = None | |
current_status_text = "No data yet" | |
app = Flask(__name__) | |
def get_health_status(): | |
"""Fetch health status from API.""" | |
try: | |
r = requests.get(API_URL, timeout=5) | |
r.raise_for_status() | |
data = r.json() | |
return data.get("health") | |
except Exception as e: | |
print(f"[ERROR] Failed to fetch health status: {e}") | |
return None | |
def send_telegram_message(text): | |
"""Send a message to Telegram group.""" | |
try: | |
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage" | |
payload = {"chat_id": CHAT_ID, "text": text} | |
requests.post(url, json=payload, timeout=5) | |
print(f"[BOT] Sent message: {text}") | |
except Exception as e: | |
print(f"[ERROR] Failed to send message: {e}") | |
def send_startup_message(): | |
"""Send bot startup message with current status.""" | |
health = get_health_status() | |
if health: | |
msg = STATUS_MESSAGES.get(health, f"Unknown status: {health}") | |
else: | |
msg = "Unable to fetch Uma status at startup." | |
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
send_telegram_message(f"π Bot started at {start_time}\nCurrent Uma status:\n{msg}") | |
def monitor_status(): | |
"""Background thread to monitor Uma health status.""" | |
global last_health, current_status_text | |
while True: | |
health = get_health_status() | |
if health: | |
msg = STATUS_MESSAGES.get(health, f"Unknown status: {health}") | |
current_status_text = msg | |
if health != last_health: | |
send_telegram_message(msg) | |
print(f"[UPDATE] Uma status changed to: {health} - {msg}") | |
last_health = health | |
time.sleep(CHECK_INTERVAL) | |
def home(): | |
return current_status_text | |
if __name__ == "__main__": | |
send_startup_message() # Send startup message no matter what | |
threading.Thread(target=monitor_status, daemon=True).start() | |
app.run(host="0.0.0.0", port=7860) | |