stato / app.py
shashwatIDR's picture
Update app.py
aabefe6 verified
import requests
import time
import threading
from flask import Flask
from datetime import datetime
# ==== CONFIG ====
BOT_TOKEN = "7316836080:AAEv7wRHNmgCb707ppG6rpTV0mhEBC4n26A"
CHAT_ID = -1002038741539 # Group chat ID
API_URL = "https://raw.githubusercontent.com/n-ce/Uma/main/dynamic_instances.json"
CHECK_INTERVAL = 10 # seconds
# Simple health messages
STATUS_MESSAGES = {
"U": "U => Health is good, playback can run optimally",
"P": "P => Health is fair enough, playback can have risks",
"I": "I => Health is fair enough, playback can have risks",
"N": "N => Health is compromised, Playback may take time or errors may occur."
}
last_health = None
current_status_text = "No data yet"
app = Flask(__name__)
def get_health_status():
"""Fetch health status from API."""
try:
r = requests.get(API_URL, timeout=5)
r.raise_for_status()
data = r.json()
return data.get("health")
except Exception as e:
print(f"[ERROR] Failed to fetch health status: {e}")
return None
def send_telegram_message(text):
"""Send a message to Telegram group."""
try:
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
payload = {"chat_id": CHAT_ID, "text": text}
requests.post(url, json=payload, timeout=5)
print(f"[BOT] Sent message: {text}")
except Exception as e:
print(f"[ERROR] Failed to send message: {e}")
def send_startup_message():
"""Send bot startup message with current status."""
health = get_health_status()
if health:
msg = STATUS_MESSAGES.get(health, f"Unknown status: {health}")
else:
msg = "Unable to fetch Uma status at startup."
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
send_telegram_message(f"πŸ”” Bot started at {start_time}\nCurrent Uma status:\n{msg}")
def monitor_status():
"""Background thread to monitor Uma health status."""
global last_health, current_status_text
while True:
health = get_health_status()
if health:
msg = STATUS_MESSAGES.get(health, f"Unknown status: {health}")
current_status_text = msg
if health != last_health:
send_telegram_message(msg)
print(f"[UPDATE] Uma status changed to: {health} - {msg}")
last_health = health
time.sleep(CHECK_INTERVAL)
@app.route("/")
def home():
return current_status_text
if __name__ == "__main__":
send_startup_message() # Send startup message no matter what
threading.Thread(target=monitor_status, daemon=True).start()
app.run(host="0.0.0.0", port=7860)