Test / brain.py
mrwabnalas40's picture
Upload 10 files
7212614 verified
# brain.py — PTB v13.15 + Flask (no .env, hardcoded token)
# -*- coding: utf-8 -*-
import sys, socket
import os
import json
import logging
import threading
from difflib import get_close_matches
from urllib.parse import urlparse
from flask import Flask, request, render_template, session, redirect, jsonify
# ===== App config (hardcoded token) =====
BOT_TOKEN = "000000000:TEST_TOKEN_PLACEHOLDER" # ← ضع توكنك الحقيقي هنا إن رغبت
APP_HOST = "0.0.0.0"
APP_PORT = 7530
SECRET_KEY = "noura-super-secret"
# ===== Optional internal modules (loaded defensively) =====
try:
import responses
except Exception:
responses = None
try:
import analyzer
except Exception:
analyzer = None
try:
import media_analyzer
except Exception:
media_analyzer = None
# ===== Memory API (preferred) =====
try:
from memory import (
load_memory as mem_load,
save_memory as mem_save,
load_global_memory as mem_load_global,
save_global_memory as mem_save_global,
)
except Exception:
# Fallback minimal memory (local JSON files) if memory.py not available
def _mf_user(username: str) -> str:
return f"memory_{username}.json"
def mem_load(username: str):
f = _mf_user(username)
return json.load(open(f, encoding="utf-8")) if os.path.exists(f) else {}
def mem_save(username: str, data: dict):
with open(_mf_user(username), "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def mem_load_global():
return json.load(open("global_memory.json", encoding="utf-8")) if os.path.exists("global_memory.json") else {}
def mem_save_global(data: dict):
with open("global_memory.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# ===== Telegram v13.15 imports (sync API) =====
from telegram import Update
from telegram.constants import ChatAction
from telegram.ext import MessageHandler, filters, CallbackContext
# ===== Logging =====
logging.basicConfig(
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
level=logging.INFO,
)
log = logging.getLogger("brain")
# ===== Flask =====
app = Flask(__name__)
app.secret_key = SECRET_KEY
# ===== Helpers =====
def fix_url(url: str) -> str:
url = (url or "").strip()
if not url:
return url
parsed = urlparse(url)
if not parsed.scheme:
if url.startswith("//"):
return "https:" + url
return "https://" + url
return url
def detect_media_type(url: str) -> str:
u = (url or "").lower()
if u.endswith((".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp")):
return "image"
if u.endswith((".mp4", ".mov", ".avi", ".mkv", ".webm")):
return "video"
if u.endswith((".mp3", ".wav", ".ogg", ".m4a", ".flac")):
return "audio"
return "link"
def analyze_url_type_safe(url: str) -> str:
if analyzer and hasattr(analyzer, "analyze_url_type"):
try:
return analyzer.analyze_url_type(url)
except Exception as e:
log.warning("analyze_url_type failed: %s", e)
return "unknown"
def call_responses_generate(text: str, **kw) -> str:
if responses and hasattr(responses, "generate_reply"):
try:
return responses.generate_reply(text, **kw) or ""
except Exception as e:
log.warning("responses.generate_reply failed: %s", e)
return ""
def call_media_image(url: str) -> str:
if media_analyzer and hasattr(media_analyzer, "analyze_image_from_url"):
try:
return media_analyzer.analyze_image_from_url(url)
except Exception as e:
log.warning("analyze_image_from_url failed: %s", e)
return "تم استلام الصورة (معالج الصور غير متاح)."
def call_media_video(url: str) -> str:
if media_analyzer and hasattr(media_analyzer, "analyze_video_from_url"):
try:
return media_analyzer.analyze_video_from_url(url)
except Exception as e:
log.warning("analyze_video_from_url failed: %s", e)
return "تم استلام الفيديو (معالج الفيديو غير متاح)."
def call_media_audio(url: str) -> str:
if media_analyzer and hasattr(media_analyzer, "analyze_audio_from_url"):
try:
return media_analyzer.analyze_audio_from_url(url)
except Exception as e:
log.warning("analyze_audio_from_url failed: %s", e)
return "تم استلام الصوت (معالج الصوت غير متاح)."
# ===== Core reply =====
def generate_reply(message: str, username: str = "مجهول") -> str:
user_mem = mem_load(username)
global_mem = mem_load_global()
# 1) exact hit in user memory
if message in user_mem:
return user_mem[message]
# 2) fuzzy search in global memory
gm_keys = list(global_mem.keys())
if gm_keys:
m = get_close_matches(message, gm_keys, n=1, cutoff=0.6)
if m:
return global_mem[m[0]]
# 3) URLs / media
fixed = fix_url(message)
reply = ""
if fixed.startswith("http://") or fixed.startswith("https://"):
mtype = detect_media_type(fixed)
if mtype == "image":
reply = f"تحليل الصورة:\n{call_media_image(fixed)}"
elif mtype == "video":
reply = f"تحليل الفيديو:\n{call_media_video(fixed)}"
elif mtype == "audio":
reply = f"تحليل الصوت:\n{call_media_audio(fixed)}"
else:
kind = analyze_url_type_safe(fixed)
reply = f"الرابط من نوع: {kind}"
else:
# 4) use responses module if present, else fallback
alt = call_responses_generate(message, analysis={}) or ""
reply = alt if alt else f"رد تلقائي: {message[::-1]}"
# 5) persist
user_mem[message] = reply
global_mem[message] = reply
mem_save(username, user_mem)
mem_save_global(global_mem)
return reply
# ===== Telegram Handlers (v13 sync) =====
def tg_send_action(update: Update, context: CallbackContext, action: ChatAction):
try:
context.bot.send_chat_action(chat_id=update.effective_chat.id, action=action)
except Exception:
pass
def handle_text(update: Update, context: CallbackContext):
if not update.message:
return
text = update.message.text or ""
tg_send_action(update, context, ChatAction.TYPING)
try:
resp = generate_reply(text, username=str(update.effective_user.id) if update.effective_user else "مجهول")
update.message.reply_text(resp)
except Exception as e:
log.exception("Text handler error: %s", e)
update.message.reply_text("حدث خطأ أثناء معالجة الرسالة.")
def handle_photo(update: Update, context: CallbackContext):
if not update.message or not update.message.photo:
return
tg_send_action(update, context, ChatAction.UPLOAD_PHOTO)
try:
file = update.message.photo[-1].get_file()
url = file.file_path # Telegram CDN URL
resp = call_media_image(url)
update.message.reply_text(resp)
except Exception as e:
log.exception("Photo handler error: %s", e)
update.message.reply_text("تم استلام الصورة.")
def handle_video(update: Update, context: CallbackContext):
if not update.message or not update.message.video:
return
tg_send_action(update, context, ChatAction.UPLOAD_VIDEO)
try:
file = update.message.video.get_file()
url = file.file_path
resp = call_media_video(url)
update.message.reply_text(resp)
except Exception as e:
log.exception("Video handler error: %s", e)
update.message.reply_text("تم استلام الفيديو.")
def handle_audio(update: Update, context: CallbackContext):
if not update.message or not (update.message.audio or update.message.voice):
return
tg_send_action(update, context, ChatAction.RECORD_AUDIO)
try:
fobj = update.message.audio or update.message.voice
file = fobj.get_file()
url = file.file_path
resp = call_media_audio(url)
update.message.reply_text(resp)
except Exception as e:
log.exception("Audio handler error: %s", e)
update.message.reply_text("تم استلام الصوت.")
def _run_tg_updater():
updater = Updater(BOT_TOKEN, use_context=True)
dp = updater.dispatcher
dp.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text))
dp.add_handler(MessageHandler(filters.PHOTO, handle_photo))
dp.add_handler(MessageHandler(filters.VIDEO, handle_video))
dp.add_handler(MessageHandler(filters.AUDIO | filters.VOICE, handle_audio))
log.info("Telegram bot running (PTB v13.15)…")
updater.start_polling()
updater.idle()
def run_telegram_bot_thread():
t = threading.Thread(target=_run_tg_updater, daemon=True)
t.start()
# ===== Flask routes =====
@app.route("/")
def home():
return render_template("login.html") if os.path.exists("templates/login.html") else "OK"
@app.route("/chat", methods=["GET", "POST"])
def chat():
if request.method == "POST":
session["username"] = request.form.get("username", "مجهول")
return render_template("index.html", username=session["username"]) if os.path.exists("templates/index.html") else f"Hello {session['username']}"
if "username" in session:
return render_template("index.html", username=session["username"]) if os.path.exists("templates/index.html") else f"Hello {session['username']}"
return redirect("/")
@app.route("/api", methods=["POST"])
def api():
data = request.json or {}
username = data.get("username", "مجهول")
message = data.get("message", "")
return jsonify({"reply": generate_reply(message, username)})
@app.route("/memory")
def view_memory():
if "username" not in session:
return redirect("/")
memory = mem_load(session["username"])
return render_template("memory.html", username=session["username"], memory=memory) if os.path.exists("templates/memory.html") else jsonify(memory)
# ===== Main =====
if __name__ == "__main__":
# شغّل بوت تيليجرام في Thread منفصل
run_telegram_bot_thread()
# شغّل Flask على 0.0.0.0:APP_PORT
print(f"[brain] Flask listening on {APP_HOST}:{APP_PORT} (token set={bool(BOT_TOKEN)})")
app.run(host=APP_HOST, port=APP_PORT)