Spaces:
Running
Running
import re | |
import random | |
from telegram import InlineKeyboardButton, InlineKeyboardMarkup | |
from telegram.error import TelegramError | |
import logging | |
from typing import Dict, Any, Optional, Tuple | |
logging.basicConfig( | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.INFO, | |
handlers=[ | |
logging.FileHandler("bot.log", encoding='utf-8'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
def escape_markdown(text: str) -> str: | |
if not text: | |
return "" | |
# إضافة دعم للهروب من المزيد من الأحرف الخاصة في MarkdownV2 | |
escape_chars = r'_*[]()~`>#+=|{.!-}' | |
return ''.join(f'\\{char}' if char in escape_chars else char for char in str(text)) | |
def generate_positive_message() -> str: | |
messages = [ | |
"🚗 يلا نلقى سيارة أحلامك! 😍", | |
"💥 جاهزة لأحلى صفقة بالعراق! 🌟", | |
"🔥 بيع وشراء السيارات صار متعة! 🚘", | |
"😎 مع CarBot، كل شي أسهل! 💨" | |
] | |
return random.choice(messages) | |
def format_ad(ad: Dict[str, Any]) -> str: | |
if not ad: | |
return escape_markdown("❌ لا توجد بيانات!") | |
reactions = ad.get('reactions', '') | |
photos = ad.get('photos', []) | |
photo_count = len(photos) if photos else 0 | |
fields = [ | |
f"*السيارة*: {escape_markdown(ad.get('car_name_ar', ''))} \\({escape_markdown(ad.get('car_name_en', ''))}\\)", | |
f"*السعر*: {escape_markdown(ad.get('price', ''))}", | |
f"*الموديل*: {escape_markdown(ad.get('model', ''))}", | |
f"*المحرك*: {escape_markdown(ad.get('engine_specs', ''))}", | |
f"*عداد الأميال*: {escape_markdown(ad.get('mileage', ''))}", | |
f"*الوارد*: {escape_markdown(ad.get('origin', ''))}", | |
f"*رقم السيارة*: {escape_markdown(ad.get('plate', ''))}", | |
f"*الحوادث*: {escape_markdown(ad.get('accidents', ''))}", | |
f"*المواصفات*: {escape_markdown(ad.get('specs', ''))}", | |
f"*الملاحظات*: {escape_markdown(ad.get('notes', 'غير محدد'))}", | |
f"*الموقع*: {escape_markdown(ad.get('location', ''))}", | |
f"*رقم الهاتف*: {escape_markdown(ad.get('phone', ''))}", | |
f"*عدد الصور*: {photo_count}", | |
f"{reactions}" | |
] | |
return "\n".join(fields) | |
def validate_phone(phone: str) -> Tuple[Optional[str], Optional[str]]: | |
if not phone: | |
logger.warning("Empty phone number provided") | |
return None, "❌ أدخلي رقم هاتف صحيح (مثال: +9647712345678)" | |
try: | |
arabic_to_latin = str.maketrans('٠١٢٣٤٥٦٧٨٩', '0123456789') | |
phone = phone.translate(arabic_to_latin) | |
phone = re.sub(r'[\s\-]', '', phone) | |
# تحقق إضافي للتأكد إن الرقم يبدأ بـ +964 أو 0 | |
if not (phone.startswith('+964') or phone.startswith('0')): | |
return None, "❌ الرقم لازم يبدأ بـ +964 أو 0! جربي مرة ثانية" | |
pattern = r'^(\+964|0)(77[0-7]|78[0-9]|79[0-9]|75[0-1][0-9])[0-9]{7}$' | |
if re.match(pattern, phone): | |
if not phone.startswith('+964'): | |
phone = '+964' + phone.lstrip('0') | |
return phone, None | |
logger.warning(f"Invalid phone number format: {phone}") | |
return None, "❌ رقم غير صحيح! جربي مثال: +9647712345678" | |
except Exception as e: | |
logger.error(f"Error validating phone number: {str(e)}") | |
return None, "❌ خطأ بالتحقق من الرقم! جربي مرة ثانية" | |
async def send_admin_notification(bot, admin_id: str, message: str, ad_id: Optional[int] = None): | |
if not admin_id: | |
logger.error("Admin chat ID not set") | |
return | |
try: | |
keyboard = [] | |
if ad_id: | |
keyboard.append([ | |
InlineKeyboardButton("✅ الموافقة", callback_data=f"admin_approve_{ad_id}"), | |
InlineKeyboardButton("❌ الرفض", callback_data=f"admin_reject_{ad_id}"), | |
InlineKeyboardButton("✏️ تعديل", callback_data=f"admin_edit_ad_{ad_id}") | |
]) | |
await bot.send_message( | |
chat_id=admin_id, | |
text=escape_markdown(message), | |
reply_markup=InlineKeyboardMarkup(keyboard), | |
parse_mode='MarkdownV2' | |
) | |
logger.info(f"Admin notification sent: {message}") | |
except TelegramError as e: | |
logger.error(f"Failed to send admin notification: {str(e)}") | |
async def post_to_channel(context, channel_id: str, ad_text: str, photo: Optional[str] = None) -> Optional[int]: | |
try: | |
if photo: | |
message = await context.bot.send_photo( | |
chat_id=channel_id, | |
photo=photo, | |
caption=ad_text, | |
parse_mode='MarkdownV2' | |
) | |
else: | |
message = await context.bot.send_message( | |
chat_id=channel_id, | |
text=ad_text, | |
parse_mode='MarkdownV2' | |
) | |
logger.info(f"Posted to channel {channel_id}: {message.message_id}") | |
return message.message_id | |
except TelegramError as e: | |
logger.error(f"Failed to post to channel {channel_id}: {str(e)}") | |
return None | |
async def delete_channel_post(context): | |
from database import Database | |
try: | |
job_data = context.job.data | |
await context.bot.delete_message( | |
chat_id=job_data["channel_id"], | |
message_id=job_data["message_id"] | |
) | |
db = Database() | |
db.update_ad_status(job_data["ad_id"], "expired") | |
logger.info(f"Deleted channel post {job_data['message_id']} for ad {job_data['ad_id']}") | |
except TelegramError as e: | |
logger.error(f"Failed to delete channel post: {str(e)}") | |
def generate_reactions(ad: Dict[str, Any]) -> str: | |
reactions = ['👍', '❤️', '⭐', '🔥'] | |
try: | |
price = float(re.sub(r'[^\d.]', '', ad.get('price', '0'))) | |
num_reactions = random.randint(5, min(15, int(price / 1000))) | |
selected_reactions = random.choices(reactions, k=num_reactions) | |
reaction_counts = {r: selected_reactions.count(r) for r in set(selected_reactions)} | |
return ''.join([f"{r}{count} " for r, count in reaction_counts.items()]) | |
except Exception as e: | |
logger.error(f"Error generating reactions: {str(e)}") | |
return "" | |