carbot-mixtral / utils.py
imstevenleo's picture
Upload 4 files
4e127d3 verified
raw
history blame
6.66 kB
import re
import random
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.error import TelegramError
import logging
from typing import Dict, Any, Optional, Tuple
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler("bot.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def escape_markdown(text: str) -> str:
if not text:
return ""
# إضافة دعم للهروب من المزيد من الأحرف الخاصة في MarkdownV2
escape_chars = r'_*[]()~`>#+=|{.!-}'
return ''.join(f'\\{char}' if char in escape_chars else char for char in str(text))
def generate_positive_message() -> str:
messages = [
"🚗 يلا نلقى سيارة أحلامك! 😍",
"💥 جاهزة لأحلى صفقة بالعراق! 🌟",
"🔥 بيع وشراء السيارات صار متعة! 🚘",
"😎 مع CarBot، كل شي أسهل! 💨"
]
return random.choice(messages)
def format_ad(ad: Dict[str, Any]) -> str:
if not ad:
return escape_markdown("❌ لا توجد بيانات!")
reactions = ad.get('reactions', '')
photos = ad.get('photos', [])
photo_count = len(photos) if photos else 0
fields = [
f"*السيارة*: {escape_markdown(ad.get('car_name_ar', ''))} \\({escape_markdown(ad.get('car_name_en', ''))}\\)",
f"*السعر*: {escape_markdown(ad.get('price', ''))}",
f"*الموديل*: {escape_markdown(ad.get('model', ''))}",
f"*المحرك*: {escape_markdown(ad.get('engine_specs', ''))}",
f"*عداد الأميال*: {escape_markdown(ad.get('mileage', ''))}",
f"*الوارد*: {escape_markdown(ad.get('origin', ''))}",
f"*رقم السيارة*: {escape_markdown(ad.get('plate', ''))}",
f"*الحوادث*: {escape_markdown(ad.get('accidents', ''))}",
f"*المواصفات*: {escape_markdown(ad.get('specs', ''))}",
f"*الملاحظات*: {escape_markdown(ad.get('notes', 'غير محدد'))}",
f"*الموقع*: {escape_markdown(ad.get('location', ''))}",
f"*رقم الهاتف*: {escape_markdown(ad.get('phone', ''))}",
f"*عدد الصور*: {photo_count}",
f"{reactions}"
]
return "\n".join(fields)
def validate_phone(phone: str) -> Tuple[Optional[str], Optional[str]]:
if not phone:
logger.warning("Empty phone number provided")
return None, "❌ أدخلي رقم هاتف صحيح (مثال: +9647712345678)"
try:
arabic_to_latin = str.maketrans('٠١٢٣٤٥٦٧٨٩', '0123456789')
phone = phone.translate(arabic_to_latin)
phone = re.sub(r'[\s\-]', '', phone)
# تحقق إضافي للتأكد إن الرقم يبدأ بـ +964 أو 0
if not (phone.startswith('+964') or phone.startswith('0')):
return None, "❌ الرقم لازم يبدأ بـ +964 أو 0! جربي مرة ثانية"
pattern = r'^(\+964|0)(77[0-7]|78[0-9]|79[0-9]|75[0-1][0-9])[0-9]{7}$'
if re.match(pattern, phone):
if not phone.startswith('+964'):
phone = '+964' + phone.lstrip('0')
return phone, None
logger.warning(f"Invalid phone number format: {phone}")
return None, "❌ رقم غير صحيح! جربي مثال: +9647712345678"
except Exception as e:
logger.error(f"Error validating phone number: {str(e)}")
return None, "❌ خطأ بالتحقق من الرقم! جربي مرة ثانية"
async def send_admin_notification(bot, admin_id: str, message: str, ad_id: Optional[int] = None):
if not admin_id:
logger.error("Admin chat ID not set")
return
try:
keyboard = []
if ad_id:
keyboard.append([
InlineKeyboardButton("✅ الموافقة", callback_data=f"admin_approve_{ad_id}"),
InlineKeyboardButton("❌ الرفض", callback_data=f"admin_reject_{ad_id}"),
InlineKeyboardButton("✏️ تعديل", callback_data=f"admin_edit_ad_{ad_id}")
])
await bot.send_message(
chat_id=admin_id,
text=escape_markdown(message),
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode='MarkdownV2'
)
logger.info(f"Admin notification sent: {message}")
except TelegramError as e:
logger.error(f"Failed to send admin notification: {str(e)}")
async def post_to_channel(context, channel_id: str, ad_text: str, photo: Optional[str] = None) -> Optional[int]:
try:
if photo:
message = await context.bot.send_photo(
chat_id=channel_id,
photo=photo,
caption=ad_text,
parse_mode='MarkdownV2'
)
else:
message = await context.bot.send_message(
chat_id=channel_id,
text=ad_text,
parse_mode='MarkdownV2'
)
logger.info(f"Posted to channel {channel_id}: {message.message_id}")
return message.message_id
except TelegramError as e:
logger.error(f"Failed to post to channel {channel_id}: {str(e)}")
return None
async def delete_channel_post(context):
from database import Database
try:
job_data = context.job.data
await context.bot.delete_message(
chat_id=job_data["channel_id"],
message_id=job_data["message_id"]
)
db = Database()
db.update_ad_status(job_data["ad_id"], "expired")
logger.info(f"Deleted channel post {job_data['message_id']} for ad {job_data['ad_id']}")
except TelegramError as e:
logger.error(f"Failed to delete channel post: {str(e)}")
def generate_reactions(ad: Dict[str, Any]) -> str:
reactions = ['👍', '❤️', '⭐', '🔥']
try:
price = float(re.sub(r'[^\d.]', '', ad.get('price', '0')))
num_reactions = random.randint(5, min(15, int(price / 1000)))
selected_reactions = random.choices(reactions, k=num_reactions)
reaction_counts = {r: selected_reactions.count(r) for r in set(selected_reactions)}
return ''.join([f"{r}{count} " for r, count in reaction_counts.items()])
except Exception as e:
logger.error(f"Error generating reactions: {str(e)}")
return ""