carbot-mixtral / utils.py
imstevenleo's picture
Upload 4 files
4e127d3 verified
import re
import random
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.error import TelegramError
import logging
from typing import Dict, Any, Optional, Tuple
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler("bot.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def escape_markdown(text: str) -> str:
if not text:
return ""
# إضافة دعم للهروب من المزيد من الأحرف الخاصة في MarkdownV2
escape_chars = r'_*[]()~`>#+=|{.!-}'
return ''.join(f'\\{char}' if char in escape_chars else char for char in str(text))
def generate_positive_message() -> str:
messages = [
"🚗 يلا نلقى سيارة أحلامك! 😍",
"💥 جاهزة لأحلى صفقة بالعراق! 🌟",
"🔥 بيع وشراء السيارات صار متعة! 🚘",
"😎 مع CarBot، كل شي أسهل! 💨"
]
return random.choice(messages)
def format_ad(ad: Dict[str, Any]) -> str:
if not ad:
return escape_markdown("❌ لا توجد بيانات!")
reactions = ad.get('reactions', '')
photos = ad.get('photos', [])
photo_count = len(photos) if photos else 0
fields = [
f"*السيارة*: {escape_markdown(ad.get('car_name_ar', ''))} \\({escape_markdown(ad.get('car_name_en', ''))}\\)",
f"*السعر*: {escape_markdown(ad.get('price', ''))}",
f"*الموديل*: {escape_markdown(ad.get('model', ''))}",
f"*المحرك*: {escape_markdown(ad.get('engine_specs', ''))}",
f"*عداد الأميال*: {escape_markdown(ad.get('mileage', ''))}",
f"*الوارد*: {escape_markdown(ad.get('origin', ''))}",
f"*رقم السيارة*: {escape_markdown(ad.get('plate', ''))}",
f"*الحوادث*: {escape_markdown(ad.get('accidents', ''))}",
f"*المواصفات*: {escape_markdown(ad.get('specs', ''))}",
f"*الملاحظات*: {escape_markdown(ad.get('notes', 'غير محدد'))}",
f"*الموقع*: {escape_markdown(ad.get('location', ''))}",
f"*رقم الهاتف*: {escape_markdown(ad.get('phone', ''))}",
f"*عدد الصور*: {photo_count}",
f"{reactions}"
]
return "\n".join(fields)
def validate_phone(phone: str) -> Tuple[Optional[str], Optional[str]]:
if not phone:
logger.warning("Empty phone number provided")
return None, "❌ أدخلي رقم هاتف صحيح (مثال: +9647712345678)"
try:
arabic_to_latin = str.maketrans('٠١٢٣٤٥٦٧٨٩', '0123456789')
phone = phone.translate(arabic_to_latin)
phone = re.sub(r'[\s\-]', '', phone)
# تحقق إضافي للتأكد إن الرقم يبدأ بـ +964 أو 0
if not (phone.startswith('+964') or phone.startswith('0')):
return None, "❌ الرقم لازم يبدأ بـ +964 أو 0! جربي مرة ثانية"
pattern = r'^(\+964|0)(77[0-7]|78[0-9]|79[0-9]|75[0-1][0-9])[0-9]{7}$'
if re.match(pattern, phone):
if not phone.startswith('+964'):
phone = '+964' + phone.lstrip('0')
return phone, None
logger.warning(f"Invalid phone number format: {phone}")
return None, "❌ رقم غير صحيح! جربي مثال: +9647712345678"
except Exception as e:
logger.error(f"Error validating phone number: {str(e)}")
return None, "❌ خطأ بالتحقق من الرقم! جربي مرة ثانية"
async def send_admin_notification(bot, admin_id: str, message: str, ad_id: Optional[int] = None):
if not admin_id:
logger.error("Admin chat ID not set")
return
try:
keyboard = []
if ad_id:
keyboard.append([
InlineKeyboardButton("✅ الموافقة", callback_data=f"admin_approve_{ad_id}"),
InlineKeyboardButton("❌ الرفض", callback_data=f"admin_reject_{ad_id}"),
InlineKeyboardButton("✏️ تعديل", callback_data=f"admin_edit_ad_{ad_id}")
])
await bot.send_message(
chat_id=admin_id,
text=escape_markdown(message),
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode='MarkdownV2'
)
logger.info(f"Admin notification sent: {message}")
except TelegramError as e:
logger.error(f"Failed to send admin notification: {str(e)}")
async def post_to_channel(context, channel_id: str, ad_text: str, photo: Optional[str] = None) -> Optional[int]:
try:
if photo:
message = await context.bot.send_photo(
chat_id=channel_id,
photo=photo,
caption=ad_text,
parse_mode='MarkdownV2'
)
else:
message = await context.bot.send_message(
chat_id=channel_id,
text=ad_text,
parse_mode='MarkdownV2'
)
logger.info(f"Posted to channel {channel_id}: {message.message_id}")
return message.message_id
except TelegramError as e:
logger.error(f"Failed to post to channel {channel_id}: {str(e)}")
return None
async def delete_channel_post(context):
from database import Database
try:
job_data = context.job.data
await context.bot.delete_message(
chat_id=job_data["channel_id"],
message_id=job_data["message_id"]
)
db = Database()
db.update_ad_status(job_data["ad_id"], "expired")
logger.info(f"Deleted channel post {job_data['message_id']} for ad {job_data['ad_id']}")
except TelegramError as e:
logger.error(f"Failed to delete channel post: {str(e)}")
def generate_reactions(ad: Dict[str, Any]) -> str:
reactions = ['👍', '❤️', '⭐', '🔥']
try:
price = float(re.sub(r'[^\d.]', '', ad.get('price', '0')))
num_reactions = random.randint(5, min(15, int(price / 1000)))
selected_reactions = random.choices(reactions, k=num_reactions)
reaction_counts = {r: selected_reactions.count(r) for r in set(selected_reactions)}
return ''.join([f"{r}{count} " for r, count in reaction_counts.items()])
except Exception as e:
logger.error(f"Error generating reactions: {str(e)}")
return ""