Spaces:
Running
Running
import os | |
import random | |
import logging | |
import json | |
import time | |
from datetime import datetime, timedelta | |
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
from telegram.ext import ( | |
Application, | |
CommandHandler, | |
MessageHandler, | |
CallbackQueryHandler, | |
ContextTypes, | |
filters, | |
) | |
from telegram.error import TelegramError | |
from dotenv import load_dotenv | |
from database import Database | |
from utils import ( | |
escape_markdown, | |
generate_positive_message, | |
format_ad, | |
validate_phone, | |
send_admin_notification, | |
post_to_channel, | |
delete_channel_post, | |
generate_reactions, | |
) | |
import nltk | |
from nltk.tokenize import word_tokenize | |
from enum import Enum | |
# Download NLTK data | |
try: | |
nltk.download('punkt', quiet=True) | |
nltk.download('punkt_tab', quiet=True) | |
except Exception as e: | |
logging.error(f"Failed to download NLTK data: {str(e)}") | |
# Logging setup | |
logging.basicConfig( | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.INFO, | |
handlers=[ | |
logging.FileHandler("bot.log", encoding='utf-8'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") | |
ADMIN_CHAT_ID = os.getenv("ADMIN_CHAT_ID") | |
CHANNEL_ID = os.getenv("CHANNEL_ID") | |
SUPPORT_USERNAME = os.getenv("SUPPORT_USERNAME", "@Support") | |
# Initialize database | |
db = Database() | |
# قاموس للمصطلحات المحلية | |
local_terms = { | |
"اوباما": "كرايسلر 300", | |
"غراضها": "قطع غيار" | |
} | |
# قاعدة بيانات محلية موسعة للسيارات الشائعة في العراق | |
popular_cars = [ | |
{ | |
"name": "تويوتا كورولا", | |
"model_year": "2016-2020", | |
"price": "4000-6000 دولار", | |
"type": "سيدان", | |
"maintenance": "سهلة ومتوفرة، قطع غيار متوفرة بكثرة", | |
"maintenance_locations": "بغداد، البصرة، أربيل", | |
"maintenance_cost": "150-300 ألف دينار سنويًا", | |
"popularity": "أكثر السيارات مبيعاً", | |
"location": "بغداد، البصرة", | |
"fuel_efficiency": "اقتصادية جدًا (12-15 كم/لتر)", | |
"suitable_for": "العائلات والشباب", | |
"spare_parts": "متوفرة في معظم ورش الصيانة بأسعار معقولة", | |
"reliability": "عالية جدًا" | |
}, | |
{ | |
"name": "هونداي اكسنت", | |
"model_year": "2015-2019", | |
"price": "3500-5000 دولار", | |
"type": "سيدان", | |
"maintenance": "متوسطة، قطع غيار متوفرة", | |
"maintenance_locations": "أربيل، النجف، بغداد", | |
"maintenance_cost": "200-400 ألف دينار سنويًا", | |
"popularity": "شائعة بين الشباب", | |
"location": "أربيل، النجف", | |
"fuel_efficiency": "جيدة (10-13 كم/لتر)", | |
"suitable_for": "الأفراد", | |
"spare_parts": "متوفرة لكن قد تكون أغلى قليلاً", | |
"reliability": "جيدة" | |
}, | |
{ | |
"name": "كيا ريو", | |
"model_year": "2014-2018", | |
"price": "4500-5500 دولار", | |
"type": "سيدان", | |
"maintenance": "متوسطة، قطع غيار متوفرة", | |
"maintenance_locations": "كربلاء، الموصل، بغداد", | |
"maintenance_cost": "250-450 ألف دينار سنويًا", | |
"popularity": "مناسبة للعائلات الصغيرة", | |
"location": "كربلاء، الموصل", | |
"fuel_efficiency": "متوسطة (9-12 كم/لتر)", | |
"suitable_for": "العائلات الصغيرة", | |
"spare_parts": "متوفرة في معظم المدن", | |
"reliability": "جيدة" | |
}, | |
{ | |
"name": "نيسان سنترا", | |
"model_year": "2016-2020", | |
"price": "5000-7000 دولار", | |
"type": "سيدان", | |
"maintenance": "متوسطة، قطع غيار باهظة قليلاً", | |
"maintenance_locations": "السليمانية، بغداد", | |
"maintenance_cost": "300-600 ألف دينار سنويًا", | |
"popularity": "محبوبة لقوتها", | |
"location": "السليمانية", | |
"fuel_efficiency": "جيدة (11-14 كم/لتر)", | |
"suitable_for": "الأفراد", | |
"spare_parts": "متوفرة لكن نادرة في بعض المناطق", | |
"reliability": "متوسطة إلى عالية" | |
}, | |
{ | |
"name": "تويوتا لاندكروزر", | |
"model_year": "2010-2015", | |
"price": "15000-25000 دولار", | |
"type": "دفع رباعي", | |
"maintenance": "متوسطة، قطع غيار متوفرة", | |
"maintenance_locations": "بغداد، دهوك، السليمانية", | |
"maintenance_cost": "500-1000 ألف دينار سنويًا", | |
"popularity": "شائعة في المناطق الريفية", | |
"location": "بغداد، دهوك", | |
"fuel_efficiency": "ثقيلة (6-8 كم/لتر)", | |
"suitable_for": "المناطق الوعرة", | |
"spare_parts": "متوفرة في المدن الكبرى", | |
"reliability": "عالية جدًا" | |
}, | |
{ | |
"name": "كرايسلر 300", | |
"model_year": "2018-2022", | |
"price": "3000000-4000000 دينار عراقي", | |
"type": "سيدان", | |
"maintenance": "متوسطة، قطع غيار متوفرة في المدن الكبرى", | |
"maintenance_locations": "بغداد، البصرة", | |
"maintenance_cost": "400-700 ألف دينار سنويًا", | |
"popularity": "شائعة بين عشاق السيارات الفاخرة", | |
"location": "بغداد، البصرة", | |
"fuel_efficiency": "متوسطة (8-10 كم/لتر)", | |
"suitable_for": "الأفراد والعائلات", | |
"spare_parts": "متوفرة لكن قد تكون باهظة", | |
"reliability": "جيدة" | |
}, | |
{ | |
"name": "هيونداي سوناتا", | |
"model_year": "2015-2019", | |
"price": "6000-8000 دولار", | |
"type": "سيدان", | |
"maintenance": "متوسطة، قطع غيار متوفرة", | |
"maintenance_locations": "بغداد، كربلاء، النجف", | |
"maintenance_cost": "300-500 ألف دينار سنويًا", | |
"popularity": "شائعة بين العائلات", | |
"location": "بغداد، كربلاء", | |
"fuel_efficiency": "جيدة (10-12 كم/لتر)", | |
"suitable_for": "العائلات", | |
"spare_parts": "متوفرة بأسعار معقولة", | |
"reliability": "جيدة" | |
}, | |
{ | |
"name": "تويوتا برادو", | |
"model_year": "2012-2018", | |
"price": "12000-18000 دولار", | |
"type": "دفع رباعي", | |
"maintenance": "متوسطة، قطع غيار متوفرة", | |
"maintenance_locations": "أربيل، دهوك، بغداد", | |
"maintenance_cost": "400-800 ألف دينار سنويًا", | |
"popularity": "شائعة في المناطق الريفية والمدن", | |
"location": "أربيل، دهوك", | |
"fuel_efficiency": "متوسطة (7-9 كم/لتر)", | |
"suitable_for": "العائلات والمناطق الوعرة", | |
"spare_parts": "متوفرة في المدن الكبرى", | |
"reliability": "عالية" | |
} | |
] | |
# تعريف الحالات باستخدام enum | |
class UserState(Enum): | |
AD_INTRO = "ad_intro" | |
AD_CAR_NAME_AR = "ad_car_name_ar" | |
AD_CAR_NAME_EN = "ad_car_name_en" | |
AD_PRICE = "ad_price" | |
AD_MODEL = "ad_model" | |
AD_ENGINE_SPECS = "ad_engine_specs" | |
AD_MILEAGE = "ad_mileage" | |
AD_ORIGIN = "ad_origin" | |
AD_PLATE = "ad_plate" | |
AD_ACCIDENTS = "ad_accidents" | |
AD_SPECS = "ad_specs" | |
AD_NOTES = "ad_notes" | |
AD_LOCATION = "ad_location" | |
AD_PHONE = "ad_phone" | |
AD_PHOTOS = "ad_photos" | |
SEARCHING_AD = "searching_ad" | |
CAR_GPT = "car_gpt" | |
ADMIN_EDIT = "admin_edit" | |
MAINTENANCE = "maintenance" | |
SPARE_PARTS = "spare_parts" | |
# Cache لتسريع الاستجابة | |
recommendation_cache = {} | |
def load_scraped_data(filename: str = "cars_data.json") -> list: | |
if not os.path.exists(filename): | |
logger.warning(f"ملف {filename} غير موجود. سيتم إرجاع قائمة فارغة.") | |
return [] | |
try: | |
with open(filename, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
if not isinstance(data, list): | |
logger.error(f"ملف {filename} لا يحتوي على قائمة صالحة.") | |
return [] | |
return data | |
except json.JSONDecodeError as e: | |
logger.error(f"خطأ في تحميل {filename}: {str(e)}") | |
return [] | |
except Exception as e: | |
logger.error(f"خطأ غير متوقع في تحميل {filename}: {str(e)}") | |
return [] | |
# تحميل بيانات السكرابر | |
scraped_cars = load_scraped_data() | |
# تحليل السؤال باستخدام NLP | |
def analyze_question(text: str) -> dict: | |
try: | |
tokens = word_tokenize(text.lower()) | |
keywords = [ | |
'سيارة', 'سعر', 'موديل', 'سنة', 'صيانة', 'قطع', 'أفضل', 'أكثر', 'شائعة', 'عائلية', 'رخيصة', 'ورقة', | |
'تويوتا', 'هونداي', 'كيا', 'نيسان', 'بغداد', 'البصرة', 'أربيل', 'كرايسلر' | |
] | |
context = {k: k in tokens for k in keywords} | |
context['price'] = next((int(k) * 100 for k in tokens if k.isdigit() and 'ورقة' in tokens), None) | |
context['model_year'] = next((k for k in tokens if k.isdigit() and len(k) == 4 and 2000 <= int(k) <= 2025), None) | |
context['mileage'] = next((k for k in tokens if k.isdigit() and 'ألف' in tokens), None) | |
for term, actual in local_terms.items(): | |
if term in tokens: | |
context['car_name'] = actual | |
return context | |
except Exception as e: | |
logger.error(f"خطأ في تحليل السؤال: {str(e)}") | |
tokens = text.lower().split() | |
context = {k: k in tokens for k in ['سيارة', 'سعر', 'موديل', 'صيانة', 'قطع', 'رخيصة']} | |
context['car_name'] = next((local_terms.get(t) for t in tokens if t in local_terms), None) | |
return context | |
# Start command | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
user_id = str(update.effective_user.id) | |
username = update.effective_user.username or "غير معروف" | |
logger.info(f"User {user_id} ({username}) started the bot") | |
db.add_user(user_id, username) | |
keyboard = [ | |
[InlineKeyboardButton("🚗 إضافة إعلان", callback_data="add_ad")], | |
[InlineKeyboardButton("🔍 البحث عن سيارة", callback_data="search_ad")], | |
[InlineKeyboardButton("🤖 توصية من CarBot", callback_data="car_gpt")], | |
[InlineKeyboardButton("📊 إعلاناتي", callback_data="my_ads")] | |
] | |
if user_id == ADMIN_CHAT_ID: | |
keyboard.append([InlineKeyboardButton("👩💼 لوحة التحكم", callback_data="admin_dashboard")]) | |
message = f"مرحبًا {escape_markdown(username)}\! 😍 {escape_markdown(generate_positive_message())}\nاختري خيار:" | |
await update.message.reply_text( | |
message, | |
reply_markup=InlineKeyboardMarkup(keyboard), | |
parse_mode='MarkdownV2' | |
) | |
# Button callback handler | |
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
query = update.callback_query | |
await query.answer() | |
user_id = str(query.from_user.id) | |
data = query.data | |
logger.info(f"Button callback: {data} by user {user_id}") | |
if data == "main_menu": | |
await start(update, context) | |
return | |
if data == "add_ad": | |
context.user_data["state"] = UserState.AD_INTRO.value | |
context.user_data["ad"] = {} | |
context.user_data["photos"] = [] | |
intro_message = ( | |
"📢 *إضافة إعلان جديد*\n" | |
"لإضافة إعلان، بنحتاج منك المعلومات التالية:\n" | |
"1. اسم السيارة بالعربي\n" | |
"2. اسم السيارة بالإنجليزي\n" | |
"3. السعر\n" | |
"4. الموديل\n" | |
"5. المحرك\n" | |
"6. عداد الأميال\n" | |
"7. الوارد\n" | |
"8. رقم السيارة\n" | |
"9. الحوادث\n" | |
"10. المواصفات\n" | |
"11. الملاحظات\n" | |
"12. الموقع\n" | |
"13. رقم الهاتف\n" | |
"14. صور السيارة (1-5 صور)\n\n" | |
"📌 بناخد المعلومات خطوة بخطوة. هل أنتِ جاهزة؟" | |
) | |
keyboard = [ | |
[InlineKeyboardButton("نعم، جاهزة!", callback_data="ad_ready")], | |
[InlineKeyboardButton("لا، رجعيني للقائمة", callback_data="main_menu")] | |
] | |
await query.message.reply_text( | |
escape_markdown(intro_message), | |
reply_markup=InlineKeyboardMarkup(keyboard), | |
parse_mode='MarkdownV2' | |
) | |
return | |
if data == "ad_ready": | |
context.user_data["state"] = UserState.AD_CAR_NAME_AR.value | |
await query.message.reply_text( | |
escape_markdown("📝 أكتبي اسم السيارة بالعربي (مثل: تويوتا كورولا):"), | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]), | |
parse_mode='MarkdownV2' | |
) | |
return | |
if data == "search_ad": | |
context.user_data["state"] = UserState.SEARCHING_AD.value | |
await query.message.reply_text( | |
escape_markdown("🔍 أكتبي تفاصيل البحث (مثل: تويوتا، 2020، بغداد، 20-30 ألف دولار):"), | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]), | |
parse_mode='MarkdownV2' | |
) | |
return | |
if data == "car_gpt": | |
context.bot_data.setdefault("car_gpt_active", {})[user_id] = True | |
context.user_data["state"] = UserState.CAR_GPT.value | |
await query.message.reply_text( | |
escape_markdown("🤖 أكتبي طلبك لـ CarBot (مثل: سيارة رخيصة 50 ورقة، أو أفضل سيارة عائلية):\nاكتبي 'قائمة' للرجوع."), | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]), | |
parse_mode='MarkdownV2' | |
) | |
return | |
if data == "my_ads": | |
ads = db.get_user_ads(user_id) | |
if not ads: | |
await query.message.reply_text( | |
escape_markdown("😔 ما عندك إعلانات! جربي تضيفي إعلان جديد."), | |
reply_markup=InlineKeyboardMarkup([ | |
[InlineKeyboardButton("🚗 إضافة إعلان", callback_data="add_ad")], | |
[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")] | |
]), | |
parse_mode='MarkdownV2' | |
) | |
return | |
for ad in ads: | |
await query.message.reply_text( | |
format_ad(ad), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if data == "admin_dashboard" and user_id == ADMIN_CHAT_ID: | |
stats = db.get_stats() | |
message = f"📊 *لوحة التحكم:*\n👥 *المستخدمين*: {stats['users']}\n🚗 *الإعلانات*: {stats['ads']}\n🔎 *التوصيات*: {stats.get('recommendations', 0)}" | |
await query.message.reply_text( | |
escape_markdown(message), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if data == "maintenance": | |
context.user_data["state"] = UserState.MAINTENANCE.value | |
await query.message.reply_text( | |
escape_markdown("🛠️ أكتبي اسم السيارة لمعلومات الصيانة (مثل: تويوتا كورولا):"), | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]), | |
parse_mode='MarkdownV2' | |
) | |
return | |
if data == "spare_parts": | |
context.user_data["state"] = UserState.SPARE_PARTS.value | |
await query.message.reply_text( | |
escape_markdown("🔩 أكتبي اسم السيارة لمعلومات قطع الغيار (مثل: تويوتا كورولا):"), | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]), | |
parse_mode='MarkdownV2' | |
) | |
return | |
if data.startswith("admin_approve_") and user_id == ADMIN_CHAT_ID: | |
ad_id = data[len("admin_approve_"):] | |
try: | |
ad = db.get_ad(ad_id) | |
ad['reactions'] = generate_reactions(ad) | |
message_id = await post_to_channel(context, CHANNEL_ID, format_ad(ad), ad.get('photos', [None])[0]) | |
if message_id: | |
db.update_ad_status(ad_id, "approved", message_id=message_id) | |
context.job_queue.run_once( | |
delete_channel_post, | |
when=timedelta(days=30), | |
data={"ad_id": ad_id, "message_id": message_id, "channel_id": CHANNEL_ID}, | |
name=f"delete_ad_{ad_id}" | |
) | |
await query.message.reply_text(escape_markdown(f"✅ تم الموافقة على الإعلان {ad_id}\\!"), parse_mode='MarkdownV2') | |
logger.info(f"Ad {ad_id} approved by admin {user_id}") | |
else: | |
await query.message.reply_text(escape_markdown(f"❌ خطأ بنشر الإعلان! تواصلي مع \\{SUPPORT_USERNAME}\\!"), parse_mode='MarkdownV2') | |
logger.error(f"Failed to post ad {ad_id}") | |
except Exception as e: | |
logger.error(f"Error approving ad {ad_id}: {str(e)}") | |
await query.message.reply_text(escape_markdown(f"❌ فشل الموافقة! تواصلي مع \\{SUPPORT_USERNAME}\\!"), parse_mode='MarkdownV2') | |
return | |
if data.startswith("admin_reject_") and user_id == ADMIN_CHAT_ID: | |
ad_id = data[len("admin_reject_"):] | |
try: | |
db.update_ad_status(ad_id, "rejected") | |
await query.message.reply_text(escape_markdown(f"🗑️ تم رفض الإعلان {ad_id}\\!"), parse_mode='MarkdownV2') | |
logger.info(f"Ad {ad_id} rejected by admin {user_id}") | |
except Exception as e: | |
logger.error(f"Error rejecting ad {ad_id}: {str(e)}") | |
await query.message.reply_text(escape_markdown(f"❌ خطأ برفض الإعلان! تواصلي مع \\{SUPPORT_USERNAME}\\!"), parse_mode='MarkdownV2') | |
return | |
if data.startswith("admin_edit_ad_") and user_id == ADMIN_CHAT_ID: | |
ad_id = data[len("admin_edit_ad_"):] | |
context.user_data["state"] = UserState.ADMIN_EDIT.value + f"_{ad_id}" | |
context.user_data["ad_id"] = ad_id | |
await query.message.reply_text( | |
escape_markdown(f"📝 أكتبي التعديلات للإعلان (ID: {ad_id}):"), | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]), | |
parse_mode='MarkdownV2' | |
) | |
return | |
await query.message.reply_text( | |
escape_markdown("❓ خيار غير معروف! جربي من القائمة:"), | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]), | |
parse_mode='MarkdownV2' | |
) | |
# CarBot recommendation generator using Falcon-Arabic | |
async def generate_car_recommendation(user_input: str) -> str: | |
cache_key = user_input.lower().strip() | |
if cache_key in recommendation_cache and time.time() - recommendation_cache[cache_key]["timestamp"] < 3600: | |
return recommendation_cache[cache_key]["response"] | |
try: | |
context = analyze_question(user_input) | |
keywords = user_input.lower().split() | |
prompt = None | |
# تحسين المطالبات لتكون أكثر تحديدًا | |
if context.get('ورقة') and context.get('price'): | |
price_dollars = context['price'] | |
prompt = f"ابحث عن سيارة بسعر حوالي {price_dollars} دولار أو أقل في العراق، وأجب بلهجة عراقية طبيعية عن سعرها، مواصفاتها، أماكن بيعها، وتوفر قطع الغيار." | |
elif context.get('صيانة') or context.get('قطع'): | |
car_name = context.get('car_name', next((k for k in keywords if k in [car["name"].split()[0].lower() for car in popular_cars]), user_input)) | |
prompt = f"أجب بلهجة عراقية طبيعية عن صيانة أو قطع غيار السيارة {car_name} في العراق، مع ذكر أماكن الصيانة، التكلفة، وتوفر القطع." | |
elif context.get('أفضل') or context.get('أكثر') or context.get('شائعة'): | |
prompt = f"أجب بلهجة عراقية طبيعية عن أفضل أو أكثر سيارة مبيعاً في العراق، مع ذكر سعرها، مواصفاتها، أماكن بيعها، ولماذا هي شائعة." | |
elif context.get('عائلية'): | |
prompt = f"ابحث عن سيارة عائلية مناسبة في العراق، وأجب بلهجة عراقية طبيعية عن سعرها، مواصفاتها، أماكن بيعها، ولماذا مناسبة للعائلات." | |
elif context.get('model_year'): | |
car_name = context.get('car_name', next((k for k in keywords if k in [car["name"].split()[0].lower() for car in popular_cars]), "غير محدد")) | |
prompt = f"أجب بلهجة عراقية طبيعية عن سيارة {car_name} موديل {context['model_year']} في العراق، مع ذكر سعرها، مواصفاتها، أماكن بيعها، وتوفر قطع الغيار." | |
else: | |
prompt = f"أجب بلهجة عراقية طبيعية عن سؤال حول السيارات في العراق: {user_input}. قدم معلومات عن السعر، المواصفات، أماكن البيع، وقطع الغيار إذا كانت متاحة." | |
# استخدام Falcon-Arabic | |
gen = load_generator() | |
response = gen(prompt, max_length=150, num_return_sequences=1, do_sample=True, temperature=0.7)[0]['generated_text'] | |
response = response.split(".")[0].split("؟")[0].strip() + "." | |
# تنظيف الرد ودمج مع بيانات السكرابر | |
if len(response) < 30 or "غير معروف" in response or "ما اعرف" in response: | |
fallback_response = await generate_fallback_recommendation(user_input, context) | |
recommendation_cache[cache_key] = {"response": fallback_response, "timestamp": time.time()} | |
return fallback_response | |
recommendation_cache[cache_key] = {"response": response, "timestamp": time.time()} | |
return response | |
except Exception as e: | |
logger.error(f"Error in generate_car_recommendation: {str(e)}") | |
fallback_response = await generate_fallback_recommendation(user_input, context) | |
recommendation_cache[cache_key] = {"response": fallback_response, "timestamp": time.time()} | |
return fallback_response | |
# Fallback recommendation with enhanced Iraqi context | |
async def generate_fallback_recommendation(user_input: str, context: dict) -> str: | |
keywords = user_input.lower().split() | |
# البحث في بيانات السكرابر أولاً | |
matched_scraped_car = None | |
if context.get('price'): | |
price_dollars = context['price'] | |
matched_scraped_car = next( | |
(c for c in scraped_cars if any(k in c.get("name", "").lower() for k in keywords) and | |
any(str(price_dollars) in c.get("price", "") for k in keywords)), None | |
) | |
elif context.get('model_year'): | |
matched_scraped_car = next( | |
(c for c in scraped_cars if any(k in c.get("name", "").lower() for k in keywords) and | |
context['model_year'] in c.get("model_year", "")), None | |
) | |
else: | |
matched_scraped_car = next( | |
(c for c in scraped_cars if any(k in c.get("name", "").lower() for k in keywords)), None | |
) | |
if matched_scraped_car: | |
message = ( | |
f"🚗 بناءً على طلبك '{user_input}'، أنصحك بـ {matched_scraped_car.get('name', 'غير محدد')}:\n" | |
f"السعر: {matched_scraped_car.get('price', 'غير محدد')}\n" | |
f"الموديل: {matched_scraped_car.get('model_year', 'غير محدد')}\n" | |
f"الموقع: {matched_scraped_car.get('location', 'غير محدد')}\n" | |
f"المواصفات: {matched_scraped_car.get('description', 'غير محدد')}\n" | |
f"الصيانة: {matched_scraped_car.get('maintenance', 'غير محدد')}\n" | |
f"قطع الغيار: {matched_scraped_car.get('spare_parts', 'غير محدد')}" | |
) | |
return message | |
# إذا ما لقينا في السكرابر، نرجع لـ popular_cars | |
car = random.choice(popular_cars) | |
if context.get('ورقة') and context.get('price'): | |
price_dollars = context['price'] | |
matched_car = next((c for c in popular_cars if int(c["price"].split("-")[1].replace(" دولار", "")) <= price_dollars), car) | |
message = ( | |
f"🚗 إذا بدك سيارة بـ {price_dollars} دولار، أنصحك بـ {matched_car['name']}:\n" | |
f"السعر: {matched_car['price']}\n" | |
f"النوع: {matched_car['type']}\n" | |
f"الصيانة: {matched_car['maintenance']} (بتكلف حوالي {matched_car['maintenance_cost']})\n" | |
f"أماكن الصيانة: {matched_car['maintenance_locations']}\n" | |
f"الشعبية: {matched_car['popularity']}\n" | |
f"تلكينها بـ: {matched_car['location']}\n" | |
f"كفاءة البنزين: {matched_car['fuel_efficiency']}\n" | |
f"قطع الغيار: {matched_car['spare_parts']}" | |
) | |
elif context.get('صيانة') or context.get('قطع'): | |
matched_car = next((c for c in popular_cars if any(k in c["name"].lower() for k in keywords)), car) | |
message = ( | |
f"🚗 معلومات صيانة وقطع غيار لسيارة {matched_car['name']}:\n" | |
f"الصيانة: {matched_car['maintenance']}\n" | |
f"تكلفة الصيانة: {matched_car['maintenance_cost']}\n" | |
f"أماكن الصيانة: {matched_car['maintenance_locations']}\n" | |
f"قطع الغيار: {matched_car['spare_parts']}" | |
) | |
elif context.get('أفضل') or context.get('أكثر') or context.get('شائعة'): | |
matched_car = next((c for c in popular_cars if "أكثر" in c["popularity"]), car) | |
message = ( | |
f"🚗 أكثر سيارة شائعة في العراق هي {matched_car['name']}:\n" | |
f"السعر: {matched_car['price']}\n" | |
f"النوع: {matched_car['type']}\n" | |
f"الصيانة: {matched_car['maintenance']}\n" | |
f"الشعبية: {matched_car['popularity']}\n" | |
f"تلكينها بـ: {matched_car['location']}\n" | |
f"كفاءة البنزين: {matched_car['fuel_efficiency']}\n" | |
f"قطع الغيار: {matched_car['spare_parts']}" | |
) | |
elif context.get('عائلية'): | |
matched_car = next((c for c in popular_cars if "عائلات" in c["suitable_for"] or "دفع رباعي" in c["type"]), car) | |
message = ( | |
f"🚗 إذا بدك سيارة عائلية، أنصحك بـ {matched_car['name']}:\n" | |
f"السعر: {matched_car['price']}\n" | |
f"النوع: {matched_car['type']}\n" | |
f"الصيانة: {matched_car['maintenance']}\n" | |
f"الشعبية: {matched_car['popularity']}\n" | |
f"تلكينها بـ: {matched_car['location']}\n" | |
f"كفاءة البنزين: {matched_car['fuel_efficiency']}\n" | |
f"قطع الغيار: {matched_car['spare_parts']}" | |
) | |
elif context.get('model_year'): | |
matched_car = next((c for c in popular_cars if context['model_year'] in c["model_year"]), car) | |
message = ( | |
f"🚗 بناءً على طلبك '{user_input}'، أنصحك بـ {matched_car['name']}:\n" | |
f"السعر: {matched_car['price']}\n" | |
f"النوع: {matched_car['type']}\n" | |
f"الصيانة: {matched_car['maintenance']}\n" | |
f"الشعبية: {matched_car['popularity']}\n" | |
f"تلكينها بـ: {matched_car['location']}\n" | |
f"كفاءة البنزين: {matched_car['fuel_efficiency']}\n" | |
f"قطع الغيار: {matched_car['spare_parts']}" | |
) | |
else: | |
matched_car = next((c for c in popular_cars if any(k in c["name"].lower() for k in keywords)), car) | |
message = ( | |
f"🔍 ما لقيت معلومات دقيقة عن '{user_input}'، بس أنصحك بـ {matched_car['name']}:\n" | |
f"السعر: {matched_car['price']}\n" | |
f"النوع: {matched_car['type']}\n" | |
f"الصيانة: {matched_car['maintenance']}\n" | |
f"الشعبية: {matched_car['popularity']}\n" | |
f"تلكينها بـ: {matched_car['location']}\n" | |
f"كفاءة البنزين: {matched_car['fuel_efficiency']}\n" | |
f"قطع الغيار: {matched_car['spare_parts']}" | |
) | |
return message | |
# Message handler | |
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
user_id = str(update.effective_user.id) | |
state = context.user_data.get("state") | |
logger.info(f"Message from user {user_id} in state {state}") | |
if update.message.photo and state == UserState.AD_PHOTOS.value: | |
if len(context.user_data["photos"]) >= 5: | |
await update.message.reply_text( | |
escape_markdown("❌ وصلتِ للحد الأقصى (5 صور)! أرسلي 'تم' للمتابعة أو 'قائمة' للرجوع:"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
context.user_data["photos"].append(update.message.photo[-1].file_id) | |
await update.message.reply_text( | |
escape_markdown(f"📸 تم استلام صورة ({len(context.user_data['photos'])}/5). أرسلي صورة ثانية أو اكتبي 'تم' إذا خلّصتِ:"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if not update.message.text: | |
await update.message.reply_text( | |
escape_markdown("❌ أرسلي نص أو صورة حسب الخطوة!"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
text = update.message.text.strip() | |
logger.info(f"Message from user {user_id} in state {state}: {text}") | |
if state == UserState.CAR_GPT.value: | |
if text.lower() == "قائمة": | |
await start(update, context) | |
return | |
recommendation = await generate_car_recommendation(text) | |
await update.message.reply_text( | |
escape_markdown(recommendation), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([ | |
[InlineKeyboardButton("صيانة", callback_data="maintenance")], | |
[InlineKeyboardButton("قطع غيار", callback_data="spare_parts")], | |
[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")] | |
]) | |
) | |
return | |
if state == UserState.SEARCHING_AD.value: | |
ads = db.search_ads(text) | |
if not ads: | |
await update.message.reply_text( | |
escape_markdown("😔 ما لقيت إعلانات تطابق بحثك! جربي كلمات ثانية:"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
for ad in ads: | |
await update.message.reply_text( | |
format_ad(ad), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.MAINTENANCE.value: | |
context = analyze_question(text) | |
car_name = context.get('car_name', text) | |
prompt = f"أجب بلهجة عراقية طبيعية عن صيانة السيارة {car_name} في العراق، مع ذكر أماكن الصيانة والتكلفة إن أمكن." | |
try: | |
gen = load_generator() | |
response = gen(prompt, max_length=150, num_return_sequences=1, do_sample=True, temperature=0.7)[0]['generated_text'] | |
response = response.split(".")[0].split("؟")[0].strip() + "." | |
if len(response) < 30: | |
matched_car = next((c for c in popular_cars if car_name.lower() in c["name"].lower()), random.choice(popular_cars)) | |
response = ( | |
f"🚗 معلومات صيانة لسيارة {matched_car['name']}:\n" | |
f"الصيانة: {matched_car['maintenance']}\n" | |
f"تكلفة الصيانة: {matched_car['maintenance_cost']}\n" | |
f"أماكن الصيانة: {matched_car['maintenance_locations']}" | |
) | |
await update.message.reply_text( | |
escape_markdown(response), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
except Exception as e: | |
logger.error(f"Error fetching maintenance info: {str(e)}") | |
matched_car = next((c for c in popular_cars if car_name.lower() in c["name"].lower()), random.choice(popular_cars)) | |
message = ( | |
f"🚗 معلومات صيانة لسيارة {matched_car['name']}:\n" | |
f"الصيانة: {matched_car['maintenance']}\n" | |
f"تكلفة الصيانة: {matched_car['maintenance_cost']}\n" | |
f"أماكن الصيانة: {matched_car['maintenance_locations']}" | |
) | |
await update.message.reply_text( | |
escape_markdown(message), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.SPARE_PARTS.value: | |
context = analyze_question(text) | |
car_name = context.get('car_name', text) | |
prompt = f"أجب بلهجة عراقية طبيعية عن قطع غيار السيارة {car_name} في العراق، مع ذكر أماكن التوفر والأسعار إن أمكن." | |
try: | |
gen = load_generator() | |
response = gen(prompt, max_length=150, num_return_sequences=1, do_sample=True, temperature=0.7)[0]['generated_text'] | |
response = response.split(".")[0].split("؟")[0].strip() + "." | |
if len(response) < 30: | |
matched_car = next((c for c in popular_cars if car_name.lower() in c["name"].lower()), random.choice(popular_cars)) | |
response = ( | |
f"🔩 قطع غيار لسيارة {matched_car['name']}:\n" | |
f"التوفر: {matched_car['spare_parts']}\n" | |
f"أماكن الصيانة: {matched_car['maintenance_locations']}" | |
) | |
await update.message.reply_text( | |
escape_markdown(response), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
except Exception as e: | |
logger.error(f"Error fetching spare parts info: {str(e)}") | |
matched_car = next((c for c in popular_cars if car_name.lower() in c["name"].lower()), random.choice(popular_cars)) | |
message = ( | |
f"🔩 قطع غيار لسيارة {matched_car['name']}:\n" | |
f"التوفر: {matched_car['spare_parts']}\n" | |
f"أماكن الصيانة: {matched_car['maintenance_locations']}" | |
) | |
await update.message.reply_text( | |
escape_markdown(message), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_CAR_NAME_AR.value: | |
context.user_data["ad"]["car_name_ar"] = text | |
context.user_data["state"] = UserState.AD_CAR_NAME_EN.value | |
await update.message.reply_text( | |
escape_markdown("📝 أكتبي اسم السيارة بالإنجليزي (مثل: Toyota Corolla):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_CAR_NAME_EN.value: | |
context.user_data["ad"]["car_name_en"] = text | |
context.user_data["state"] = UserState.AD_PRICE.value | |
await update.message.reply_text( | |
escape_markdown("💰 أكتبي السعر (مثل: 25 ألف دولار):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_PRICE.value: | |
if not any(char.isdigit() for char in text): | |
await update.message.reply_text( | |
escape_markdown("❌ السعر لازم يحتوي على أرقام! جربي مرة ثانية:"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
context.user_data["ad"]["price"] = text | |
context.user_data["state"] = UserState.AD_MODEL.value | |
await update.message.reply_text( | |
escape_markdown("📅 أكتبي الموديل (مثل: 2020):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_MODEL.value: | |
if not text.isdigit(): | |
await update.message.reply_text( | |
escape_markdown("❌ الموديل لازم يكون رقم! جربي مرة ثانية:"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
context.user_data["ad"]["model"] = text | |
context.user_data["state"] = UserState.AD_ENGINE_SPECS.value | |
await update.message.reply_text( | |
escape_markdown("⚙️ أكتبي مواصفات المحرك (مثل: 1.8L):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_ENGINE_SPECS.value: | |
context.user_data["ad"]["engine_specs"] = text | |
context.user_data["state"] = UserState.AD_MILEAGE.value | |
await update.message.reply_text( | |
escape_markdown("🛣️ أكتبي عداد الأميال (مثل: 50 ألف كم):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_MILEAGE.value: | |
context.user_data["ad"]["mileage"] = text | |
context.user_data["state"] = UserState.AD_ORIGIN.value | |
await update.message.reply_text( | |
escape_markdown("🌎 أكتبي الوارد (مثل: أمريكي):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_ORIGIN.value: | |
context.user_data["ad"]["origin"] = text | |
context.user_data["state"] = UserState.AD_PLATE.value | |
await update.message.reply_text( | |
escape_markdown("🔢 أكتبي رقم السيارة (مثل: بغداد 123):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_PLATE.value: | |
context.user_data["ad"]["plate"] = text | |
context.user_data["state"] = UserState.AD_ACCIDENTS.value | |
await update.message.reply_text( | |
escape_markdown("🚨 أكتبي حالة الحوادث (مثل: بدون حوادث):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_ACCIDENTS.value: | |
context.user_data["ad"]["accidents"] = text | |
context.user_data["state"] = UserState.AD_SPECS.value | |
await update.message.reply_text( | |
escape_markdown("🛠️ أكتبي المواصفات (مثل: فل مواصفات):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_SPECS.value: | |
context.user_data["ad"]["specs"] = text | |
context.user_data["state"] = UserState.AD_NOTES.value | |
await update.message.reply_text( | |
escape_markdown("📋 أكتبي الملاحظات (مثل: نظيفة، أو اتركيها فاضية إذا ما في):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_NOTES.value: | |
context.user_data["ad"]["notes"] = text if text else "غير محدد" | |
context.user_data["state"] = UserState.AD_LOCATION.value | |
await update.message.reply_text( | |
escape_markdown("📍 أكتبي الموقع (مثل: بغداد):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_LOCATION.value: | |
context.user_data["ad"]["location"] = text | |
context.user_data["state"] = UserState.AD_PHONE.value | |
await update.message.reply_text( | |
escape_markdown("📞 أكتبي رقم الهاتف (مثال: +9647712345678):"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_PHONE.value: | |
phone_number, error_message = validate_phone(text) | |
if not phone_number: | |
await update.message.reply_text( | |
escape_markdown(error_message), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
logger.warning(f"Invalid phone by user {user_id}: {text}") | |
return | |
context.user_data["ad"]["phone"] = phone_number | |
context.user_data["state"] = UserState.AD_PHOTOS.value | |
await update.message.reply_text( | |
escape_markdown("📸 أرسلي صور السيارة (من 1 إلى 5 صور). أرسلي صورة واحدة في كل رسالة، وبعد ما تخلّصي أرسلي 'تم':"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
if state == UserState.AD_PHOTOS.value: | |
if text.lower() == "تم": | |
if not context.user_data["photos"]: | |
await update.message.reply_text( | |
escape_markdown("❌ لازم ترسلي صورة وحدة على الأقل! أرسلي صورة أو اكتبي 'قائمة' للرجوع:"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
return | |
ad = context.user_data["ad"] | |
ad["photos"] = context.user_data["photos"] | |
ad["user_id"] = user_id | |
ad["created_at"] = datetime.now() | |
ad_id = db.add_ad(ad) | |
await send_admin_notification( | |
context.bot, | |
ADMIN_CHAT_ID, | |
f"إعلان جديد من {escape_markdown(update.effective_user.username or 'غير معروف')}:\n{format_ad(ad)}", | |
ad_id | |
) | |
await update.message.reply_text( | |
escape_markdown("✅ تم إرسال الإعلان للمراجعة! بنخبرك لما ينعتمد."), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
context.user_data.clear() | |
return | |
if state and state.startswith(UserState.ADMIN_EDIT.value): | |
ad_id = context.user_data.get("ad_id") | |
try: | |
updates = {} | |
for field_value in text.split("\n"): | |
if ":" in field_value: | |
field, value = field_value.split(":", 1) | |
updates[field.strip()] = value.strip() | |
db.update_ad(ad_id, updates) | |
await update.message.reply_text( | |
escape_markdown(f"✅ تم تعديل الإعلان {ad_id}\\!"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
context.user_data.clear() | |
except Exception as e: | |
logger.error(f"Error editing ad {ad_id}: {str(e)}") | |
await update.message.reply_text( | |
escape_markdown(f"❌ خطأ بتعديل الإعلان! تواصلي مع \\{SUPPORT_USERNAME}\\!"), | |
parse_mode='MarkdownV2' | |
) | |
return | |
await update.message.reply_text( | |
escape_markdown("❓ أكتبي شي يناسب الخطوة أو ارجعي للقائمة!"), | |
parse_mode='MarkdownV2', | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🏠 القائمة", callback_data="main_menu")]]) | |
) | |
# Main function to run the bot | |
def main(): | |
application = Application.builder().token(TELEGRAM_TOKEN).build() | |
application.add_handler(CommandHandler("start", start)) | |
application.add_handler(CallbackQueryHandler(button_callback)) | |
application.add_handler(MessageHandler(filters.ALL & ~filters.COMMAND, handle_message)) | |
application.run_polling(allowed_updates=Update.ALL_TYPES) | |
if __name__ == "__main__": | |
main() | |