# Commit test to ensure push import sqlite3 import random import string import logging import re from typing import List, Dict, Any, Optional from datetime import datetime # Configure logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO, handlers=[ logging.FileHandler("bot.log", encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class Database: def __init__(self, db_name: str = "cars.db"): try: self.conn = sqlite3.connect(db_name, check_same_thread=False) self.cursor = self.conn.cursor() self.create_tables() self.update_schema() logger.info(f"Successfully connected to database: {db_name}") except sqlite3.Error as e: logger.error(f"Failed to connect to database: {str(e)}") raise def create_tables(self): """Create database tables and indexes if they don't exist.""" try: self.cursor.executescript(''' -- Table for users CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, username TEXT, phone TEXT, shares INTEGER DEFAULT 0, daily_recommendations INTEGER DEFAULT 0, last_recommendation_date TEXT ); -- Table for ads CREATE TABLE IF NOT EXISTS ads ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, price TEXT, car_name_ar TEXT, car_name_en TEXT, model TEXT, engine_specs TEXT, mileage TEXT, origin TEXT, plate TEXT, accidents TEXT, specs TEXT, notes TEXT, location TEXT, phone TEXT, main_photo TEXT, photos TEXT, status TEXT DEFAULT 'pending', channel_message_id INTEGER, created_at TEXT, reactions TEXT DEFAULT '', FOREIGN KEY(user_id) REFERENCES users(user_id) ); -- Table for verification codes CREATE TABLE IF NOT EXISTS verification_codes ( user_id INTEGER, phone TEXT, code TEXT, PRIMARY KEY(user_id, phone) ); -- Table for reactions CREATE TABLE IF NOT EXISTS reactions ( ad_id INTEGER, reaction_type TEXT, count INTEGER, last_updated TEXT, PRIMARY KEY(ad_id, reaction_type), FOREIGN KEY(ad_id) REFERENCES ads(id) ); -- Table for stats CREATE TABLE IF NOT EXISTS stats ( id INTEGER PRIMARY KEY, subscribers_count INTEGER DEFAULT 1300, last_updated TEXT ); -- Insert default stats INSERT OR IGNORE INTO stats (id, subscribers_count) VALUES (1, 1300); -- Indexes for faster queries CREATE INDEX IF NOT EXISTS idx_ads_user_id ON ads(user_id); CREATE INDEX IF NOT EXISTS idx_ads_status ON ads(status); CREATE INDEX IF NOT EXISTS idx_ads_car_name_ar ON ads(car_name_ar); CREATE INDEX IF NOT EXISTS idx_ads_location ON ads(location); CREATE INDEX IF NOT EXISTS idx_users_phone ON users(phone); -- Added index on created_at CREATE INDEX IF NOT EXISTS idx_ads_created_at ON ads(created_at); ''') self.conn.commit() logger.info("Database tables and indexes created successfully") except sqlite3.Error as e: logger.error(f"Error creating tables: {str(e)}") raise def update_schema(self): """Update database schema if needed (e.g., add new columns).""" try: # Check if 'username' column exists in users table self.cursor.execute("PRAGMA table_info(users)") columns = [info[1] for info in self.cursor.fetchall()] if 'username' not in columns: self.cursor.execute('ALTER TABLE users ADD COLUMN username TEXT') self.conn.commit() logger.info("Added 'username' column to users table") except sqlite3.Error as e: logger.error(f"Error updating schema: {str(e)}") raise def add_user(self, user_id: int, username: Optional[str] = None): """Add or update a user in the database.""" try: self.cursor.execute(''' INSERT OR IGNORE INTO users (user_id, username, shares, daily_recommendations) VALUES (?, ?, 0, 0) ''', (user_id, username)) self.cursor.execute(''' UPDATE stats SET subscribers_count = subscribers_count + 1, last_updated = ? WHERE id = 1 ''', (datetime.now().strftime('%Y-%m-%d %H:%M:%S'),)) self.conn.commit() logger.info(f"User {user_id} (username: {username}) added/updated") except sqlite3.Error as e: logger.error(f"Error adding user {user_id}: {str(e)}") raise def get_user_shares(self, user_id: int) -> int: """Get the number of shares for a user.""" try: self.cursor.execute('SELECT shares FROM users WHERE user_id = ?', (user_id,)) result = self.cursor.fetchone() return result[0] if result else 0 except sqlite3.Error as e: logger.error(f"Error retrieving shares for user {user_id}: {str(e)}") return 0 def increment_shares(self, user_id: int): """Increment the share count for a user.""" try: self.cursor.execute('UPDATE users SET shares = shares + 1 WHERE user_id = ?', (user_id,)) self.conn.commit() logger.info(f"Shares incremented for user {user_id}") except sqlite3.Error as e: logger.error(f"Error incrementing shares for user {user_id}: {str(e)}") def get_daily_recommendations(self, user_id: int) -> int: """Get the number of daily recommendations for a user.""" try: self.cursor.execute('SELECT daily_recommendations, last_recommendation_date FROM users WHERE user_id = ?', (user_id,)) result = self.cursor.fetchone() if result: count, date = result today = datetime.now().strftime('%Y-%m-%d') if date != today: self.cursor.execute('UPDATE users SET daily_recommendations = 0, last_recommendation_date = ? WHERE user_id = ?', (today, user_id)) self.conn.commit() logger.info(f"Reset daily recommendations for user {user_id} to 0") return 0 return count return 0 except sqlite3.Error as e: logger.error(f"Error retrieving daily recommendations for user {user_id}: {str(e)}") return 0 def increment_recommendations(self, user_id: int): """Increment the daily recommendation count for a user.""" try: today = datetime.now().strftime('%Y-%m-%d') self.cursor.execute(''' UPDATE users SET daily_recommendations = daily_recommendations + 1, last_recommendation_date = ? WHERE user_id = ? ''', (today, user_id)) self.conn.commit() logger.info(f"Recommendations incremented for user {user_id}") except sqlite3.Error as e: logger.error(f"Error incrementing recommendations for user {user_id}: {str(e)}") def generate_verification_code(self, user_id: int, phone: str) -> str: """Generate a verification code for a user.""" try: code = ''.join(random.choices(string.digits, k=6)) self.cursor.execute('INSERT OR REPLACE INTO verification_codes (user_id, phone, code) VALUES (?, ?, ?)', (user_id, phone, code)) self.conn.commit() logger.info(f"Generated verification code for user {user_id} with phone {phone}") return code except sqlite3.Error as e: logger.error(f"Error generating verification code for user {user_id}: {str(e)}") raise def verify_phone_code(self, user_id: int, code: str) -> bool: """Verify a phone code for a user.""" try: self.cursor.execute('SELECT code FROM verification_codes WHERE user_id = ?', (user_id,)) result = self.cursor.fetchone() if result and result[0] == code: self.cursor.execute('DELETE FROM verification_codes WHERE user_id = ?', (user_id,)) self.conn.commit() logger.info(f"Phone code verified for user {user_id}") return True logger.warning(f"Invalid phone code for user {user_id}") return False except sqlite3.Error as e: logger.error(f"Error verifying phone code for user {user_id}: {str(e)}") return False def create_car(self, model: str, price: float, status: str, description: Optional[str] = None) -> int: """Create a new car ad in the database.""" try: created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S') self.cursor.execute(''' INSERT INTO ads (price, car_name_ar, car_name_en, model, status, notes, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( str(price), model, model, model, status, description, created_at )) ad_id = self.cursor.lastrowid self.conn.commit() logger.info(f"Created car ad {ad_id} with model {model}") return ad_id except sqlite3.Error as e: logger.error(f"Error creating car ad: {str(e)}") raise def get_car_by_id(self, car_id: int) -> Optional[Dict[str, Any]]: """Retrieve a car ad by its ID.""" try: self.cursor.execute('SELECT id, price, car_name_ar AS model, status, notes AS description, created_at FROM ads WHERE id = ?', (car_id,)) car = self.cursor.fetchone() if car: columns = [desc[0] for desc in self.cursor.description] car_dict = dict(zip(columns, car)) logger.info(f"Retrieved car {car_id}") return car_dict logger.warning(f"Car {car_id} not found") return None except sqlite3.Error as e: logger.error(f"Error retrieving car {car_id}: {str(e)}") raise def get_all_cars(self) -> List[Dict[str, Any]]: """Retrieve all approved car ads.""" try: self.cursor.execute('SELECT id, price, car_name_ar AS model, status, notes AS description, created_at FROM ads WHERE status = "approved"') cars = self.cursor.fetchall() columns = [desc[0] for desc in self.cursor.description] cars_list = [dict(zip(columns, car)) for car in cars] logger.info(f"Retrieved {len(cars_list)} approved cars") return cars_list except sqlite3.Error as e: logger.error(f"Error retrieving all cars: {str(e)}") raise def save_ad(self, ad_data: Dict[str, Any], user_id: int) -> int: """Save a new ad in the database.""" try: created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S') self.cursor.execute(''' INSERT INTO ads (user_id, price, car_name_ar, car_name_en, model, engine_specs, mileage, origin, plate, accidents, specs, notes, location, phone, main_photo, photos, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( user_id, ad_data['price'], ad_data['car_name_ar'], ad_data['car_name_en'], ad_data['model'], ad_data['engine_specs'], ad_data['mileage'], ad_data['origin'], ad_data['plate'], ad_data['accidents'], ad_data['specs'], ad_data.get('notes'), ad_data['location'], ad_data['phone'], ad_data['main_photo'], ','.join(ad_data['photos']), created_at )) ad_id = self.cursor.lastrowid self.conn.commit() logger.info(f"Ad {ad_id} saved for user {user_id}") return ad_id except sqlite3.Error as e: logger.error(f"Error saving ad for user {user_id}: {str(e)}") raise def get_ad(self, ad_id: int) -> Optional[Dict[str, Any]]: """Retrieve an ad by its ID.""" try: self.cursor.execute('SELECT * FROM ads WHERE id = ?', (ad_id,)) ad = self.cursor.fetchone() if ad: columns = [desc[0] for desc in self.cursor.description] ad_dict = dict(zip(columns, ad)) ad_dict['photos'] = ad_dict['photos'].split(',') if ad_dict['photos'] else [] logger.info(f"Retrieved ad {ad_id}") return ad_dict logger.warning(f"Ad {ad_id} not found") return None except sqlite3.Error as e: logger.error(f"Error retrieving ad {ad_id}: {str(e)}") raise def update_ad_field(self, ad_id: int, field: str, value: Any): """Update a specific field for an ad.""" try: self.cursor.execute(f'UPDATE ads SET {field} = ? WHERE id = ?', (value, ad_id)) self.conn.commit() logger.info(f"Updated field {field} for ad {ad_id}") except sqlite3.Error as e: logger.error(f"Error updating field {field} for ad {ad_id}: {str(e)}") raise def update_ad_status(self, ad_id: int, status: str): """Update the status of an ad.""" try: self.cursor.execute('UPDATE ads SET status = ? WHERE id = ?', (status, ad_id)) self.conn.commit() logger.info(f"Updated status to {status} for ad {ad_id}") except sqlite3.Error as e: logger.error(f"Error updating status for ad {ad_id}: {str(e)}") raise def get_available_cars(self, user_id: Optional[int] = None) -> List[Dict[str, Any]]: """Retrieve approved ads, optionally filtered by user ID.""" try: if user_id: self.cursor.execute('SELECT * FROM ads WHERE status = "approved" AND user_id = ?', (user_id,)) else: self.cursor.execute('SELECT * FROM ads WHERE status = "approved"') ads = self.cursor.fetchall() columns = [desc[0] for desc in self.cursor.description] result = [] for ad in ads: ad_dict = dict(zip(columns, ad)) ad_dict['photos'] = ad_dict['photos'].split(',') if ad_dict['photos'] else [] result.append(ad_dict) logger.info(f"Retrieved {len(result)} available cars") return result except sqlite3.Error as e: logger.error(f"Error retrieving available cars: {str(e)}") raise def save_channel_message(self, ad_id: int, message_id: int): """Save a channel message ID for an ad.""" try: self.cursor.execute('UPDATE ads SET channel_message_id = ? WHERE id = ?', (message_id, ad_id)) self.conn.commit() logger.info(f"Saved channel message ID {message_id} for ad {ad_id}") except sqlite3.Error as e: logger.error(f"Error saving channel message for ad {ad_id}: {str(e)}") raise def get_total_users(self) -> int: """Get the total number of users.""" try: self.cursor.execute("SELECT subscribers_count FROM stats WHERE id = 1") result = self.cursor.fetchone() count = result[0] if result else 1300 logger.info(f"Retrieved total users: {count}") return count except sqlite3.Error as e: logger.error(f"Error retrieving total users: {str(e)}") return 1300 def search_ads(self, query: str) -> List[Dict[str, Any]]: """Search ads based on a query string.""" try: query = query.lower().strip() search_terms = re.split(r'\s+', query) conditions = ["status = 'approved'"] params = [] for term in search_terms: if term.isdigit() and len(term) == 4: # Model year (e.g., 2020) conditions.append("model LIKE ?") params.append(f"%{term}%") elif re.match(r'(\d+\.?\d*)\s*(دولار|دينار|-)?(?:\s*(\d+\.?\d*))?', term): # Price or range (e.g., 20 دولار, 20-30) price_match = re.match(r'(\d+\.?\d*)\s*(دولار|دينار|-)?(?:\s*(\d+\.?\d*))?', term) if price_match: low_price = float(price_match.group(1)) currency = price_match.group(2) high_price = float(price_match.group(3)) if price_match.group(3) else None if currency == '-': conditions.append("CAST(replace(replace(price, '$', ''), ' ألف دولار', '000') AS REAL) BETWEEN ? AND ?") params.extend([low_price * 1000, high_price * 1000]) else: conditions.append("LOWER(price) LIKE ?") params.append(f'%{low_price}%{currency if currency else ""}%') else: # Car name, location, or specs conditions.append("(LOWER(car_name_ar) LIKE ? OR LOWER(car_name_en) LIKE ? OR LOWER(location) LIKE ? OR LOWER(specs) LIKE ?)") params.extend([f"%{term}%", f"%{term}%", f"%{term}%", f"%{term}%"]) where_clause = " AND ".join(conditions) if conditions else "1=1" self.cursor.execute(f'SELECT * FROM ads WHERE {where_clause}', params) ads = self.cursor.fetchall() columns = [desc[0] for desc in self.cursor.description] result = [dict(zip(columns, ad)) for ad in ads] for ad in result: ad['photos'] = ad['photos'].split(',') if ad['photos'] else [] logger.info(f"Found {len(result)} ads for query: {query}") return result except sqlite3.Error as e: logger.error(f"Error searching ads for query {query}: {str(e)}") raise def add_reaction(self, ad_id: int, ad: Dict[str, Any]): """Add reactions to an ad.""" try: from utils import generate_reactions reactions = generate_reactions(ad) self.cursor.execute('UPDATE ads SET reactions = ? WHERE id = ?', (reactions, ad_id)) self.conn.commit() logger.info(f"Added reactions for ad {ad_id}") except sqlite3.Error as e: logger.error(f"Error adding reactions for ad {ad_id}: {str(e)}") raise except ImportError: logger.error(f"Failed to import generate_reactions from utils for ad {ad_id}") raise def extend_ad_duration(self, ad_id: int): """Extend the duration of an ad.""" try: self.cursor.execute('UPDATE ads SET status = "approved" WHERE id = ?', (ad_id,)) self.conn.commit() logger.info(f"Extended duration for ad {ad_id}") except sqlite3.Error as e: logger.error(f"Error extending duration for ad {ad_id}: {str(e)}") raise def close(self): """Close the database connection.""" try: self.conn.close() logger.info("Database connection closed successfully") except sqlite3.Error as e: logger.error(f"Error closing database: {str(e)}")