Spaces:
Runtime error
Runtime error
# brain.py — PTB v13.15 + Flask (no .env, hardcoded token) | |
# -*- coding: utf-8 -*- | |
import sys, socket | |
import os | |
import json | |
import logging | |
import threading | |
from difflib import get_close_matches | |
from urllib.parse import urlparse | |
from flask import Flask, request, render_template, session, redirect, jsonify | |
# ===== App config (hardcoded token) ===== | |
BOT_TOKEN = "000000000:TEST_TOKEN_PLACEHOLDER" # ← ضع توكنك الحقيقي هنا إن رغبت | |
APP_HOST = "0.0.0.0" | |
APP_PORT = 7530 | |
SECRET_KEY = "noura-super-secret" | |
# ===== Optional internal modules (loaded defensively) ===== | |
try: | |
import responses | |
except Exception: | |
responses = None | |
try: | |
import analyzer | |
except Exception: | |
analyzer = None | |
try: | |
import media_analyzer | |
except Exception: | |
media_analyzer = None | |
# ===== Memory API (preferred) ===== | |
try: | |
from memory import ( | |
load_memory as mem_load, | |
save_memory as mem_save, | |
load_global_memory as mem_load_global, | |
save_global_memory as mem_save_global, | |
) | |
except Exception: | |
# Fallback minimal memory (local JSON files) if memory.py not available | |
def _mf_user(username: str) -> str: | |
return f"memory_{username}.json" | |
def mem_load(username: str): | |
f = _mf_user(username) | |
return json.load(open(f, encoding="utf-8")) if os.path.exists(f) else {} | |
def mem_save(username: str, data: dict): | |
with open(_mf_user(username), "w", encoding="utf-8") as f: | |
json.dump(data, f, ensure_ascii=False, indent=2) | |
def mem_load_global(): | |
return json.load(open("global_memory.json", encoding="utf-8")) if os.path.exists("global_memory.json") else {} | |
def mem_save_global(data: dict): | |
with open("global_memory.json", "w", encoding="utf-8") as f: | |
json.dump(data, f, ensure_ascii=False, indent=2) | |
# ===== Telegram v13.15 imports (sync API) ===== | |
from telegram import Update | |
from telegram.constants import ChatAction | |
from telegram.ext import MessageHandler, filters, CallbackContext | |
# ===== Logging ===== | |
logging.basicConfig( | |
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", | |
level=logging.INFO, | |
) | |
log = logging.getLogger("brain") | |
# ===== Flask ===== | |
app = Flask(__name__) | |
app.secret_key = SECRET_KEY | |
# ===== Helpers ===== | |
def fix_url(url: str) -> str: | |
url = (url or "").strip() | |
if not url: | |
return url | |
parsed = urlparse(url) | |
if not parsed.scheme: | |
if url.startswith("//"): | |
return "https:" + url | |
return "https://" + url | |
return url | |
def detect_media_type(url: str) -> str: | |
u = (url or "").lower() | |
if u.endswith((".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")): | |
return "image" | |
if u.endswith((".mp4", ".mov", ".avi", ".mkv", ".webm")): | |
return "video" | |
if u.endswith((".mp3", ".wav", ".ogg", ".m4a", ".flac")): | |
return "audio" | |
return "link" | |
def analyze_url_type_safe(url: str) -> str: | |
if analyzer and hasattr(analyzer, "analyze_url_type"): | |
try: | |
return analyzer.analyze_url_type(url) | |
except Exception as e: | |
log.warning("analyze_url_type failed: %s", e) | |
return "unknown" | |
def call_responses_generate(text: str, **kw) -> str: | |
if responses and hasattr(responses, "generate_reply"): | |
try: | |
return responses.generate_reply(text, **kw) or "" | |
except Exception as e: | |
log.warning("responses.generate_reply failed: %s", e) | |
return "" | |
def call_media_image(url: str) -> str: | |
if media_analyzer and hasattr(media_analyzer, "analyze_image_from_url"): | |
try: | |
return media_analyzer.analyze_image_from_url(url) | |
except Exception as e: | |
log.warning("analyze_image_from_url failed: %s", e) | |
return "تم استلام الصورة (معالج الصور غير متاح)." | |
def call_media_video(url: str) -> str: | |
if media_analyzer and hasattr(media_analyzer, "analyze_video_from_url"): | |
try: | |
return media_analyzer.analyze_video_from_url(url) | |
except Exception as e: | |
log.warning("analyze_video_from_url failed: %s", e) | |
return "تم استلام الفيديو (معالج الفيديو غير متاح)." | |
def call_media_audio(url: str) -> str: | |
if media_analyzer and hasattr(media_analyzer, "analyze_audio_from_url"): | |
try: | |
return media_analyzer.analyze_audio_from_url(url) | |
except Exception as e: | |
log.warning("analyze_audio_from_url failed: %s", e) | |
return "تم استلام الصوت (معالج الصوت غير متاح)." | |
# ===== Core reply ===== | |
def generate_reply(message: str, username: str = "مجهول") -> str: | |
user_mem = mem_load(username) | |
global_mem = mem_load_global() | |
# 1) exact hit in user memory | |
if message in user_mem: | |
return user_mem[message] | |
# 2) fuzzy search in global memory | |
gm_keys = list(global_mem.keys()) | |
if gm_keys: | |
m = get_close_matches(message, gm_keys, n=1, cutoff=0.6) | |
if m: | |
return global_mem[m[0]] | |
# 3) URLs / media | |
fixed = fix_url(message) | |
reply = "" | |
if fixed.startswith("http://") or fixed.startswith("https://"): | |
mtype = detect_media_type(fixed) | |
if mtype == "image": | |
reply = f"تحليل الصورة:\n{call_media_image(fixed)}" | |
elif mtype == "video": | |
reply = f"تحليل الفيديو:\n{call_media_video(fixed)}" | |
elif mtype == "audio": | |
reply = f"تحليل الصوت:\n{call_media_audio(fixed)}" | |
else: | |
kind = analyze_url_type_safe(fixed) | |
reply = f"الرابط من نوع: {kind}" | |
else: | |
# 4) use responses module if present, else fallback | |
alt = call_responses_generate(message, analysis={}) or "" | |
reply = alt if alt else f"رد تلقائي: {message[::-1]}" | |
# 5) persist | |
user_mem[message] = reply | |
global_mem[message] = reply | |
mem_save(username, user_mem) | |
mem_save_global(global_mem) | |
return reply | |
# ===== Telegram Handlers (v13 sync) ===== | |
def tg_send_action(update: Update, context: CallbackContext, action: ChatAction): | |
try: | |
context.bot.send_chat_action(chat_id=update.effective_chat.id, action=action) | |
except Exception: | |
pass | |
def handle_text(update: Update, context: CallbackContext): | |
if not update.message: | |
return | |
text = update.message.text or "" | |
tg_send_action(update, context, ChatAction.TYPING) | |
try: | |
resp = generate_reply(text, username=str(update.effective_user.id) if update.effective_user else "مجهول") | |
update.message.reply_text(resp) | |
except Exception as e: | |
log.exception("Text handler error: %s", e) | |
update.message.reply_text("حدث خطأ أثناء معالجة الرسالة.") | |
def handle_photo(update: Update, context: CallbackContext): | |
if not update.message or not update.message.photo: | |
return | |
tg_send_action(update, context, ChatAction.UPLOAD_PHOTO) | |
try: | |
file = update.message.photo[-1].get_file() | |
url = file.file_path # Telegram CDN URL | |
resp = call_media_image(url) | |
update.message.reply_text(resp) | |
except Exception as e: | |
log.exception("Photo handler error: %s", e) | |
update.message.reply_text("تم استلام الصورة.") | |
def handle_video(update: Update, context: CallbackContext): | |
if not update.message or not update.message.video: | |
return | |
tg_send_action(update, context, ChatAction.UPLOAD_VIDEO) | |
try: | |
file = update.message.video.get_file() | |
url = file.file_path | |
resp = call_media_video(url) | |
update.message.reply_text(resp) | |
except Exception as e: | |
log.exception("Video handler error: %s", e) | |
update.message.reply_text("تم استلام الفيديو.") | |
def handle_audio(update: Update, context: CallbackContext): | |
if not update.message or not (update.message.audio or update.message.voice): | |
return | |
tg_send_action(update, context, ChatAction.RECORD_AUDIO) | |
try: | |
fobj = update.message.audio or update.message.voice | |
file = fobj.get_file() | |
url = file.file_path | |
resp = call_media_audio(url) | |
update.message.reply_text(resp) | |
except Exception as e: | |
log.exception("Audio handler error: %s", e) | |
update.message.reply_text("تم استلام الصوت.") | |
def _run_tg_updater(): | |
updater = Updater(BOT_TOKEN, use_context=True) | |
dp = updater.dispatcher | |
dp.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)) | |
dp.add_handler(MessageHandler(filters.PHOTO, handle_photo)) | |
dp.add_handler(MessageHandler(filters.VIDEO, handle_video)) | |
dp.add_handler(MessageHandler(filters.AUDIO | filters.VOICE, handle_audio)) | |
log.info("Telegram bot running (PTB v13.15)…") | |
updater.start_polling() | |
updater.idle() | |
def run_telegram_bot_thread(): | |
t = threading.Thread(target=_run_tg_updater, daemon=True) | |
t.start() | |
# ===== Flask routes ===== | |
def home(): | |
return render_template("login.html") if os.path.exists("templates/login.html") else "OK" | |
def chat(): | |
if request.method == "POST": | |
session["username"] = request.form.get("username", "مجهول") | |
return render_template("index.html", username=session["username"]) if os.path.exists("templates/index.html") else f"Hello {session['username']}" | |
if "username" in session: | |
return render_template("index.html", username=session["username"]) if os.path.exists("templates/index.html") else f"Hello {session['username']}" | |
return redirect("/") | |
def api(): | |
data = request.json or {} | |
username = data.get("username", "مجهول") | |
message = data.get("message", "") | |
return jsonify({"reply": generate_reply(message, username)}) | |
def view_memory(): | |
if "username" not in session: | |
return redirect("/") | |
memory = mem_load(session["username"]) | |
return render_template("memory.html", username=session["username"], memory=memory) if os.path.exists("templates/memory.html") else jsonify(memory) | |
# ===== Main ===== | |
if __name__ == "__main__": | |
# شغّل بوت تيليجرام في Thread منفصل | |
run_telegram_bot_thread() | |
# شغّل Flask على 0.0.0.0:APP_PORT | |
print(f"[brain] Flask listening on {APP_HOST}:{APP_PORT} (token set={bool(BOT_TOKEN)})") | |
app.run(host=APP_HOST, port=APP_PORT) | |