import requests import time import threading from flask import Flask from datetime import datetime # ==== CONFIG ==== BOT_TOKEN = "7316836080:AAEv7wRHNmgCb707ppG6rpTV0mhEBC4n26A" CHAT_ID = -1002038741539 # Group chat ID API_URL = "https://raw.githubusercontent.com/n-ce/Uma/main/dynamic_instances.json" CHECK_INTERVAL = 10 # seconds # Simple health messages STATUS_MESSAGES = { "U": "U => Health is good, playback can run optimally", "P": "P => Health is fair enough, playback can have risks", "I": "I => Health is fair enough, playback can have risks", "N": "N => Health is compromised, Playback may take time or errors may occur." } last_health = None current_status_text = "No data yet" app = Flask(__name__) def get_health_status(): """Fetch health status from API.""" try: r = requests.get(API_URL, timeout=5) r.raise_for_status() data = r.json() return data.get("health") except Exception as e: print(f"[ERROR] Failed to fetch health status: {e}") return None def send_telegram_message(text): """Send a message to Telegram group.""" try: url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage" payload = {"chat_id": CHAT_ID, "text": text} requests.post(url, json=payload, timeout=5) print(f"[BOT] Sent message: {text}") except Exception as e: print(f"[ERROR] Failed to send message: {e}") def send_startup_message(): """Send bot startup message with current status.""" health = get_health_status() if health: msg = STATUS_MESSAGES.get(health, f"Unknown status: {health}") else: msg = "Unable to fetch Uma status at startup." start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") send_telegram_message(f"🔔 Bot started at {start_time}\nCurrent Uma status:\n{msg}") def monitor_status(): """Background thread to monitor Uma health status.""" global last_health, current_status_text while True: health = get_health_status() if health: msg = STATUS_MESSAGES.get(health, f"Unknown status: {health}") current_status_text = msg if health != last_health: send_telegram_message(msg) print(f"[UPDATE] Uma status changed to: {health} - {msg}") last_health = health time.sleep(CHECK_INTERVAL) @app.route("/") def home(): return current_status_text if __name__ == "__main__": send_startup_message() # Send startup message no matter what threading.Thread(target=monitor_status, daemon=True).start() app.run(host="0.0.0.0", port=7860)