Spaces:
Paused
Paused
File size: 6,656 Bytes
4e127d3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import re
import random
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.error import TelegramError
import logging
from typing import Dict, Any, Optional, Tuple
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler("bot.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def escape_markdown(text: str) -> str:
if not text:
return ""
# إضافة دعم للهروب من المزيد من الأحرف الخاصة في MarkdownV2
escape_chars = r'_*[]()~`>#+=|{.!-}'
return ''.join(f'\\{char}' if char in escape_chars else char for char in str(text))
def generate_positive_message() -> str:
messages = [
"🚗 يلا نلقى سيارة أحلامك! 😍",
"💥 جاهزة لأحلى صفقة بالعراق! 🌟",
"🔥 بيع وشراء السيارات صار متعة! 🚘",
"😎 مع CarBot، كل شي أسهل! 💨"
]
return random.choice(messages)
def format_ad(ad: Dict[str, Any]) -> str:
if not ad:
return escape_markdown("❌ لا توجد بيانات!")
reactions = ad.get('reactions', '')
photos = ad.get('photos', [])
photo_count = len(photos) if photos else 0
fields = [
f"*السيارة*: {escape_markdown(ad.get('car_name_ar', ''))} \\({escape_markdown(ad.get('car_name_en', ''))}\\)",
f"*السعر*: {escape_markdown(ad.get('price', ''))}",
f"*الموديل*: {escape_markdown(ad.get('model', ''))}",
f"*المحرك*: {escape_markdown(ad.get('engine_specs', ''))}",
f"*عداد الأميال*: {escape_markdown(ad.get('mileage', ''))}",
f"*الوارد*: {escape_markdown(ad.get('origin', ''))}",
f"*رقم السيارة*: {escape_markdown(ad.get('plate', ''))}",
f"*الحوادث*: {escape_markdown(ad.get('accidents', ''))}",
f"*المواصفات*: {escape_markdown(ad.get('specs', ''))}",
f"*الملاحظات*: {escape_markdown(ad.get('notes', 'غير محدد'))}",
f"*الموقع*: {escape_markdown(ad.get('location', ''))}",
f"*رقم الهاتف*: {escape_markdown(ad.get('phone', ''))}",
f"*عدد الصور*: {photo_count}",
f"{reactions}"
]
return "\n".join(fields)
def validate_phone(phone: str) -> Tuple[Optional[str], Optional[str]]:
if not phone:
logger.warning("Empty phone number provided")
return None, "❌ أدخلي رقم هاتف صحيح (مثال: +9647712345678)"
try:
arabic_to_latin = str.maketrans('٠١٢٣٤٥٦٧٨٩', '0123456789')
phone = phone.translate(arabic_to_latin)
phone = re.sub(r'[\s\-]', '', phone)
# تحقق إضافي للتأكد إن الرقم يبدأ بـ +964 أو 0
if not (phone.startswith('+964') or phone.startswith('0')):
return None, "❌ الرقم لازم يبدأ بـ +964 أو 0! جربي مرة ثانية"
pattern = r'^(\+964|0)(77[0-7]|78[0-9]|79[0-9]|75[0-1][0-9])[0-9]{7}$'
if re.match(pattern, phone):
if not phone.startswith('+964'):
phone = '+964' + phone.lstrip('0')
return phone, None
logger.warning(f"Invalid phone number format: {phone}")
return None, "❌ رقم غير صحيح! جربي مثال: +9647712345678"
except Exception as e:
logger.error(f"Error validating phone number: {str(e)}")
return None, "❌ خطأ بالتحقق من الرقم! جربي مرة ثانية"
async def send_admin_notification(bot, admin_id: str, message: str, ad_id: Optional[int] = None):
if not admin_id:
logger.error("Admin chat ID not set")
return
try:
keyboard = []
if ad_id:
keyboard.append([
InlineKeyboardButton("✅ الموافقة", callback_data=f"admin_approve_{ad_id}"),
InlineKeyboardButton("❌ الرفض", callback_data=f"admin_reject_{ad_id}"),
InlineKeyboardButton("✏️ تعديل", callback_data=f"admin_edit_ad_{ad_id}")
])
await bot.send_message(
chat_id=admin_id,
text=escape_markdown(message),
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode='MarkdownV2'
)
logger.info(f"Admin notification sent: {message}")
except TelegramError as e:
logger.error(f"Failed to send admin notification: {str(e)}")
async def post_to_channel(context, channel_id: str, ad_text: str, photo: Optional[str] = None) -> Optional[int]:
try:
if photo:
message = await context.bot.send_photo(
chat_id=channel_id,
photo=photo,
caption=ad_text,
parse_mode='MarkdownV2'
)
else:
message = await context.bot.send_message(
chat_id=channel_id,
text=ad_text,
parse_mode='MarkdownV2'
)
logger.info(f"Posted to channel {channel_id}: {message.message_id}")
return message.message_id
except TelegramError as e:
logger.error(f"Failed to post to channel {channel_id}: {str(e)}")
return None
async def delete_channel_post(context):
from database import Database
try:
job_data = context.job.data
await context.bot.delete_message(
chat_id=job_data["channel_id"],
message_id=job_data["message_id"]
)
db = Database()
db.update_ad_status(job_data["ad_id"], "expired")
logger.info(f"Deleted channel post {job_data['message_id']} for ad {job_data['ad_id']}")
except TelegramError as e:
logger.error(f"Failed to delete channel post: {str(e)}")
def generate_reactions(ad: Dict[str, Any]) -> str:
reactions = ['👍', '❤️', '⭐', '🔥']
try:
price = float(re.sub(r'[^\d.]', '', ad.get('price', '0')))
num_reactions = random.randint(5, min(15, int(price / 1000)))
selected_reactions = random.choices(reactions, k=num_reactions)
reaction_counts = {r: selected_reactions.count(r) for r in set(selected_reactions)}
return ''.join([f"{r}{count} " for r, count in reaction_counts.items()])
except Exception as e:
logger.error(f"Error generating reactions: {str(e)}")
return ""
|