import re import random from telegram import InlineKeyboardButton, InlineKeyboardMarkup from telegram.error import TelegramError import logging from typing import Dict, Any, Optional, Tuple logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO, handlers=[ logging.FileHandler("bot.log", encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def escape_markdown(text: str) -> str: if not text: return "" # إضافة دعم للهروب من المزيد من الأحرف الخاصة في MarkdownV2 escape_chars = r'_*[]()~`>#+=|{.!-}' return ''.join(f'\\{char}' if char in escape_chars else char for char in str(text)) def generate_positive_message() -> str: messages = [ "🚗 يلا نلقى سيارة أحلامك! 😍", "💥 جاهزة لأحلى صفقة بالعراق! 🌟", "🔥 بيع وشراء السيارات صار متعة! 🚘", "😎 مع CarBot، كل شي أسهل! 💨" ] return random.choice(messages) def format_ad(ad: Dict[str, Any]) -> str: if not ad: return escape_markdown("❌ لا توجد بيانات!") reactions = ad.get('reactions', '') photos = ad.get('photos', []) photo_count = len(photos) if photos else 0 fields = [ f"*السيارة*: {escape_markdown(ad.get('car_name_ar', ''))} \\({escape_markdown(ad.get('car_name_en', ''))}\\)", f"*السعر*: {escape_markdown(ad.get('price', ''))}", f"*الموديل*: {escape_markdown(ad.get('model', ''))}", f"*المحرك*: {escape_markdown(ad.get('engine_specs', ''))}", f"*عداد الأميال*: {escape_markdown(ad.get('mileage', ''))}", f"*الوارد*: {escape_markdown(ad.get('origin', ''))}", f"*رقم السيارة*: {escape_markdown(ad.get('plate', ''))}", f"*الحوادث*: {escape_markdown(ad.get('accidents', ''))}", f"*المواصفات*: {escape_markdown(ad.get('specs', ''))}", f"*الملاحظات*: {escape_markdown(ad.get('notes', 'غير محدد'))}", f"*الموقع*: {escape_markdown(ad.get('location', ''))}", f"*رقم الهاتف*: {escape_markdown(ad.get('phone', ''))}", f"*عدد الصور*: {photo_count}", f"{reactions}" ] return "\n".join(fields) def validate_phone(phone: str) -> Tuple[Optional[str], Optional[str]]: if not phone: logger.warning("Empty phone number provided") return None, "❌ أدخلي رقم هاتف صحيح (مثال: +9647712345678)" try: arabic_to_latin = str.maketrans('٠١٢٣٤٥٦٧٨٩', '0123456789') phone = phone.translate(arabic_to_latin) phone = re.sub(r'[\s\-]', '', phone) # تحقق إضافي للتأكد إن الرقم يبدأ بـ +964 أو 0 if not (phone.startswith('+964') or phone.startswith('0')): return None, "❌ الرقم لازم يبدأ بـ +964 أو 0! جربي مرة ثانية" pattern = r'^(\+964|0)(77[0-7]|78[0-9]|79[0-9]|75[0-1][0-9])[0-9]{7}$' if re.match(pattern, phone): if not phone.startswith('+964'): phone = '+964' + phone.lstrip('0') return phone, None logger.warning(f"Invalid phone number format: {phone}") return None, "❌ رقم غير صحيح! جربي مثال: +9647712345678" except Exception as e: logger.error(f"Error validating phone number: {str(e)}") return None, "❌ خطأ بالتحقق من الرقم! جربي مرة ثانية" async def send_admin_notification(bot, admin_id: str, message: str, ad_id: Optional[int] = None): if not admin_id: logger.error("Admin chat ID not set") return try: keyboard = [] if ad_id: keyboard.append([ InlineKeyboardButton("✅ الموافقة", callback_data=f"admin_approve_{ad_id}"), InlineKeyboardButton("❌ الرفض", callback_data=f"admin_reject_{ad_id}"), InlineKeyboardButton("✏️ تعديل", callback_data=f"admin_edit_ad_{ad_id}") ]) await bot.send_message( chat_id=admin_id, text=escape_markdown(message), reply_markup=InlineKeyboardMarkup(keyboard), parse_mode='MarkdownV2' ) logger.info(f"Admin notification sent: {message}") except TelegramError as e: logger.error(f"Failed to send admin notification: {str(e)}") async def post_to_channel(context, channel_id: str, ad_text: str, photo: Optional[str] = None) -> Optional[int]: try: if photo: message = await context.bot.send_photo( chat_id=channel_id, photo=photo, caption=ad_text, parse_mode='MarkdownV2' ) else: message = await context.bot.send_message( chat_id=channel_id, text=ad_text, parse_mode='MarkdownV2' ) logger.info(f"Posted to channel {channel_id}: {message.message_id}") return message.message_id except TelegramError as e: logger.error(f"Failed to post to channel {channel_id}: {str(e)}") return None async def delete_channel_post(context): from database import Database try: job_data = context.job.data await context.bot.delete_message( chat_id=job_data["channel_id"], message_id=job_data["message_id"] ) db = Database() db.update_ad_status(job_data["ad_id"], "expired") logger.info(f"Deleted channel post {job_data['message_id']} for ad {job_data['ad_id']}") except TelegramError as e: logger.error(f"Failed to delete channel post: {str(e)}") def generate_reactions(ad: Dict[str, Any]) -> str: reactions = ['👍', '❤️', '⭐', '🔥'] try: price = float(re.sub(r'[^\d.]', '', ad.get('price', '0'))) num_reactions = random.randint(5, min(15, int(price / 1000))) selected_reactions = random.choices(reactions, k=num_reactions) reaction_counts = {r: selected_reactions.count(r) for r in set(selected_reactions)} return ''.join([f"{r}{count} " for r, count in reaction_counts.items()]) except Exception as e: logger.error(f"Error generating reactions: {str(e)}") return ""