Spaces:
Running
Running
# Commit test to ensure push | |
import sqlite3 | |
import random | |
import string | |
import logging | |
import re | |
from typing import List, Dict, Any, Optional | |
from datetime import datetime | |
# Configure logging | |
logging.basicConfig( | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.INFO, | |
handlers=[ | |
logging.FileHandler("bot.log", encoding='utf-8'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
class Database: | |
def __init__(self, db_name: str = "cars.db"): | |
try: | |
self.conn = sqlite3.connect(db_name, check_same_thread=False) | |
self.cursor = self.conn.cursor() | |
self.create_tables() | |
self.update_schema() | |
logger.info(f"Successfully connected to database: {db_name}") | |
except sqlite3.Error as e: | |
logger.error(f"Failed to connect to database: {str(e)}") | |
raise | |
def create_tables(self): | |
"""Create database tables and indexes if they don't exist.""" | |
try: | |
self.cursor.executescript(''' | |
-- Table for users | |
CREATE TABLE IF NOT EXISTS users ( | |
user_id INTEGER PRIMARY KEY, | |
username TEXT, | |
phone TEXT, | |
shares INTEGER DEFAULT 0, | |
daily_recommendations INTEGER DEFAULT 0, | |
last_recommendation_date TEXT | |
); | |
-- Table for ads | |
CREATE TABLE IF NOT EXISTS ads ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
user_id INTEGER, | |
price TEXT, | |
car_name_ar TEXT, | |
car_name_en TEXT, | |
model TEXT, | |
engine_specs TEXT, | |
mileage TEXT, | |
origin TEXT, | |
plate TEXT, | |
accidents TEXT, | |
specs TEXT, | |
notes TEXT, | |
location TEXT, | |
phone TEXT, | |
main_photo TEXT, | |
photos TEXT, | |
status TEXT DEFAULT 'pending', | |
channel_message_id INTEGER, | |
created_at TEXT, | |
reactions TEXT DEFAULT '', | |
FOREIGN KEY(user_id) REFERENCES users(user_id) | |
); | |
-- Table for verification codes | |
CREATE TABLE IF NOT EXISTS verification_codes ( | |
user_id INTEGER, | |
phone TEXT, | |
code TEXT, | |
PRIMARY KEY(user_id, phone) | |
); | |
-- Table for reactions | |
CREATE TABLE IF NOT EXISTS reactions ( | |
ad_id INTEGER, | |
reaction_type TEXT, | |
count INTEGER, | |
last_updated TEXT, | |
PRIMARY KEY(ad_id, reaction_type), | |
FOREIGN KEY(ad_id) REFERENCES ads(id) | |
); | |
-- Table for stats | |
CREATE TABLE IF NOT EXISTS stats ( | |
id INTEGER PRIMARY KEY, | |
subscribers_count INTEGER DEFAULT 1300, | |
last_updated TEXT | |
); | |
-- Insert default stats | |
INSERT OR IGNORE INTO stats (id, subscribers_count) VALUES (1, 1300); | |
-- Indexes for faster queries | |
CREATE INDEX IF NOT EXISTS idx_ads_user_id ON ads(user_id); | |
CREATE INDEX IF NOT EXISTS idx_ads_status ON ads(status); | |
CREATE INDEX IF NOT EXISTS idx_ads_car_name_ar ON ads(car_name_ar); | |
CREATE INDEX IF NOT EXISTS idx_ads_location ON ads(location); | |
CREATE INDEX IF NOT EXISTS idx_users_phone ON users(phone); | |
-- Added index on created_at | |
CREATE INDEX IF NOT EXISTS idx_ads_created_at ON ads(created_at); | |
''') | |
self.conn.commit() | |
logger.info("Database tables and indexes created successfully") | |
except sqlite3.Error as e: | |
logger.error(f"Error creating tables: {str(e)}") | |
raise | |
def update_schema(self): | |
"""Update database schema if needed (e.g., add new columns).""" | |
try: | |
# Check if 'username' column exists in users table | |
self.cursor.execute("PRAGMA table_info(users)") | |
columns = [info[1] for info in self.cursor.fetchall()] | |
if 'username' not in columns: | |
self.cursor.execute('ALTER TABLE users ADD COLUMN username TEXT') | |
self.conn.commit() | |
logger.info("Added 'username' column to users table") | |
except sqlite3.Error as e: | |
logger.error(f"Error updating schema: {str(e)}") | |
raise | |
def add_user(self, user_id: int, username: Optional[str] = None): | |
"""Add or update a user in the database.""" | |
try: | |
self.cursor.execute(''' | |
INSERT OR IGNORE INTO users (user_id, username, shares, daily_recommendations) | |
VALUES (?, ?, 0, 0) | |
''', (user_id, username)) | |
self.cursor.execute(''' | |
UPDATE stats SET subscribers_count = subscribers_count + 1, last_updated = ? | |
WHERE id = 1 | |
''', (datetime.now().strftime('%Y-%m-%d %H:%M:%S'),)) | |
self.conn.commit() | |
logger.info(f"User {user_id} (username: {username}) added/updated") | |
except sqlite3.Error as e: | |
logger.error(f"Error adding user {user_id}: {str(e)}") | |
raise | |
def get_user_shares(self, user_id: int) -> int: | |
"""Get the number of shares for a user.""" | |
try: | |
self.cursor.execute('SELECT shares FROM users WHERE user_id = ?', (user_id,)) | |
result = self.cursor.fetchone() | |
return result[0] if result else 0 | |
except sqlite3.Error as e: | |
logger.error(f"Error retrieving shares for user {user_id}: {str(e)}") | |
return 0 | |
def increment_shares(self, user_id: int): | |
"""Increment the share count for a user.""" | |
try: | |
self.cursor.execute('UPDATE users SET shares = shares + 1 WHERE user_id = ?', (user_id,)) | |
self.conn.commit() | |
logger.info(f"Shares incremented for user {user_id}") | |
except sqlite3.Error as e: | |
logger.error(f"Error incrementing shares for user {user_id}: {str(e)}") | |
def get_daily_recommendations(self, user_id: int) -> int: | |
"""Get the number of daily recommendations for a user.""" | |
try: | |
self.cursor.execute('SELECT daily_recommendations, last_recommendation_date FROM users WHERE user_id = ?', (user_id,)) | |
result = self.cursor.fetchone() | |
if result: | |
count, date = result | |
today = datetime.now().strftime('%Y-%m-%d') | |
if date != today: | |
self.cursor.execute('UPDATE users SET daily_recommendations = 0, last_recommendation_date = ? WHERE user_id = ?', (today, user_id)) | |
self.conn.commit() | |
logger.info(f"Reset daily recommendations for user {user_id} to 0") | |
return 0 | |
return count | |
return 0 | |
except sqlite3.Error as e: | |
logger.error(f"Error retrieving daily recommendations for user {user_id}: {str(e)}") | |
return 0 | |
def increment_recommendations(self, user_id: int): | |
"""Increment the daily recommendation count for a user.""" | |
try: | |
today = datetime.now().strftime('%Y-%m-%d') | |
self.cursor.execute(''' | |
UPDATE users SET daily_recommendations = daily_recommendations + 1, | |
last_recommendation_date = ? WHERE user_id = ? | |
''', (today, user_id)) | |
self.conn.commit() | |
logger.info(f"Recommendations incremented for user {user_id}") | |
except sqlite3.Error as e: | |
logger.error(f"Error incrementing recommendations for user {user_id}: {str(e)}") | |
def generate_verification_code(self, user_id: int, phone: str) -> str: | |
"""Generate a verification code for a user.""" | |
try: | |
code = ''.join(random.choices(string.digits, k=6)) | |
self.cursor.execute('INSERT OR REPLACE INTO verification_codes (user_id, phone, code) VALUES (?, ?, ?)', | |
(user_id, phone, code)) | |
self.conn.commit() | |
logger.info(f"Generated verification code for user {user_id} with phone {phone}") | |
return code | |
except sqlite3.Error as e: | |
logger.error(f"Error generating verification code for user {user_id}: {str(e)}") | |
raise | |
def verify_phone_code(self, user_id: int, code: str) -> bool: | |
"""Verify a phone code for a user.""" | |
try: | |
self.cursor.execute('SELECT code FROM verification_codes WHERE user_id = ?', (user_id,)) | |
result = self.cursor.fetchone() | |
if result and result[0] == code: | |
self.cursor.execute('DELETE FROM verification_codes WHERE user_id = ?', (user_id,)) | |
self.conn.commit() | |
logger.info(f"Phone code verified for user {user_id}") | |
return True | |
logger.warning(f"Invalid phone code for user {user_id}") | |
return False | |
except sqlite3.Error as e: | |
logger.error(f"Error verifying phone code for user {user_id}: {str(e)}") | |
return False | |
def create_car(self, model: str, price: float, status: str, description: Optional[str] = None) -> int: | |
"""Create a new car ad in the database.""" | |
try: | |
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
self.cursor.execute(''' | |
INSERT INTO ads (price, car_name_ar, car_name_en, model, status, notes, created_at) | |
VALUES (?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
str(price), model, model, model, status, description, created_at | |
)) | |
ad_id = self.cursor.lastrowid | |
self.conn.commit() | |
logger.info(f"Created car ad {ad_id} with model {model}") | |
return ad_id | |
except sqlite3.Error as e: | |
logger.error(f"Error creating car ad: {str(e)}") | |
raise | |
def get_car_by_id(self, car_id: int) -> Optional[Dict[str, Any]]: | |
"""Retrieve a car ad by its ID.""" | |
try: | |
self.cursor.execute('SELECT id, price, car_name_ar AS model, status, notes AS description, created_at FROM ads WHERE id = ?', (car_id,)) | |
car = self.cursor.fetchone() | |
if car: | |
columns = [desc[0] for desc in self.cursor.description] | |
car_dict = dict(zip(columns, car)) | |
logger.info(f"Retrieved car {car_id}") | |
return car_dict | |
logger.warning(f"Car {car_id} not found") | |
return None | |
except sqlite3.Error as e: | |
logger.error(f"Error retrieving car {car_id}: {str(e)}") | |
raise | |
def get_all_cars(self) -> List[Dict[str, Any]]: | |
"""Retrieve all approved car ads.""" | |
try: | |
self.cursor.execute('SELECT id, price, car_name_ar AS model, status, notes AS description, created_at FROM ads WHERE status = "approved"') | |
cars = self.cursor.fetchall() | |
columns = [desc[0] for desc in self.cursor.description] | |
cars_list = [dict(zip(columns, car)) for car in cars] | |
logger.info(f"Retrieved {len(cars_list)} approved cars") | |
return cars_list | |
except sqlite3.Error as e: | |
logger.error(f"Error retrieving all cars: {str(e)}") | |
raise | |
def save_ad(self, ad_data: Dict[str, Any], user_id: int) -> int: | |
"""Save a new ad in the database.""" | |
try: | |
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
self.cursor.execute(''' | |
INSERT INTO ads (user_id, price, car_name_ar, car_name_en, model, engine_specs, mileage, origin, plate, | |
accidents, specs, notes, location, phone, main_photo, photos, created_at) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
user_id, ad_data['price'], ad_data['car_name_ar'], ad_data['car_name_en'], ad_data['model'], | |
ad_data['engine_specs'], ad_data['mileage'], ad_data['origin'], ad_data['plate'], ad_data['accidents'], | |
ad_data['specs'], ad_data.get('notes'), ad_data['location'], ad_data['phone'], ad_data['main_photo'], | |
','.join(ad_data['photos']), created_at | |
)) | |
ad_id = self.cursor.lastrowid | |
self.conn.commit() | |
logger.info(f"Ad {ad_id} saved for user {user_id}") | |
return ad_id | |
except sqlite3.Error as e: | |
logger.error(f"Error saving ad for user {user_id}: {str(e)}") | |
raise | |
def get_ad(self, ad_id: int) -> Optional[Dict[str, Any]]: | |
"""Retrieve an ad by its ID.""" | |
try: | |
self.cursor.execute('SELECT * FROM ads WHERE id = ?', (ad_id,)) | |
ad = self.cursor.fetchone() | |
if ad: | |
columns = [desc[0] for desc in self.cursor.description] | |
ad_dict = dict(zip(columns, ad)) | |
ad_dict['photos'] = ad_dict['photos'].split(',') if ad_dict['photos'] else [] | |
logger.info(f"Retrieved ad {ad_id}") | |
return ad_dict | |
logger.warning(f"Ad {ad_id} not found") | |
return None | |
except sqlite3.Error as e: | |
logger.error(f"Error retrieving ad {ad_id}: {str(e)}") | |
raise | |
def update_ad_field(self, ad_id: int, field: str, value: Any): | |
"""Update a specific field for an ad.""" | |
try: | |
self.cursor.execute(f'UPDATE ads SET {field} = ? WHERE id = ?', (value, ad_id)) | |
self.conn.commit() | |
logger.info(f"Updated field {field} for ad {ad_id}") | |
except sqlite3.Error as e: | |
logger.error(f"Error updating field {field} for ad {ad_id}: {str(e)}") | |
raise | |
def update_ad_status(self, ad_id: int, status: str): | |
"""Update the status of an ad.""" | |
try: | |
self.cursor.execute('UPDATE ads SET status = ? WHERE id = ?', (status, ad_id)) | |
self.conn.commit() | |
logger.info(f"Updated status to {status} for ad {ad_id}") | |
except sqlite3.Error as e: | |
logger.error(f"Error updating status for ad {ad_id}: {str(e)}") | |
raise | |
def get_available_cars(self, user_id: Optional[int] = None) -> List[Dict[str, Any]]: | |
"""Retrieve approved ads, optionally filtered by user ID.""" | |
try: | |
if user_id: | |
self.cursor.execute('SELECT * FROM ads WHERE status = "approved" AND user_id = ?', (user_id,)) | |
else: | |
self.cursor.execute('SELECT * FROM ads WHERE status = "approved"') | |
ads = self.cursor.fetchall() | |
columns = [desc[0] for desc in self.cursor.description] | |
result = [] | |
for ad in ads: | |
ad_dict = dict(zip(columns, ad)) | |
ad_dict['photos'] = ad_dict['photos'].split(',') if ad_dict['photos'] else [] | |
result.append(ad_dict) | |
logger.info(f"Retrieved {len(result)} available cars") | |
return result | |
except sqlite3.Error as e: | |
logger.error(f"Error retrieving available cars: {str(e)}") | |
raise | |
def save_channel_message(self, ad_id: int, message_id: int): | |
"""Save a channel message ID for an ad.""" | |
try: | |
self.cursor.execute('UPDATE ads SET channel_message_id = ? WHERE id = ?', (message_id, ad_id)) | |
self.conn.commit() | |
logger.info(f"Saved channel message ID {message_id} for ad {ad_id}") | |
except sqlite3.Error as e: | |
logger.error(f"Error saving channel message for ad {ad_id}: {str(e)}") | |
raise | |
def get_total_users(self) -> int: | |
"""Get the total number of users.""" | |
try: | |
self.cursor.execute("SELECT subscribers_count FROM stats WHERE id = 1") | |
result = self.cursor.fetchone() | |
count = result[0] if result else 1300 | |
logger.info(f"Retrieved total users: {count}") | |
return count | |
except sqlite3.Error as e: | |
logger.error(f"Error retrieving total users: {str(e)}") | |
return 1300 | |
def search_ads(self, query: str) -> List[Dict[str, Any]]: | |
"""Search ads based on a query string.""" | |
try: | |
query = query.lower().strip() | |
search_terms = re.split(r'\s+', query) | |
conditions = ["status = 'approved'"] | |
params = [] | |
for term in search_terms: | |
if term.isdigit() and len(term) == 4: # Model year (e.g., 2020) | |
conditions.append("model LIKE ?") | |
params.append(f"%{term}%") | |
elif re.match(r'(\d+\.?\d*)\s*(دولار|دينار|-)?(?:\s*(\d+\.?\d*))?', term): # Price or range (e.g., 20 دولار, 20-30) | |
price_match = re.match(r'(\d+\.?\d*)\s*(دولار|دينار|-)?(?:\s*(\d+\.?\d*))?', term) | |
if price_match: | |
low_price = float(price_match.group(1)) | |
currency = price_match.group(2) | |
high_price = float(price_match.group(3)) if price_match.group(3) else None | |
if currency == '-': | |
conditions.append("CAST(replace(replace(price, '$', ''), ' ألف دولار', '000') AS REAL) BETWEEN ? AND ?") | |
params.extend([low_price * 1000, high_price * 1000]) | |
else: | |
conditions.append("LOWER(price) LIKE ?") | |
params.append(f'%{low_price}%{currency if currency else ""}%') | |
else: # Car name, location, or specs | |
conditions.append("(LOWER(car_name_ar) LIKE ? OR LOWER(car_name_en) LIKE ? OR LOWER(location) LIKE ? OR LOWER(specs) LIKE ?)") | |
params.extend([f"%{term}%", f"%{term}%", f"%{term}%", f"%{term}%"]) | |
where_clause = " AND ".join(conditions) if conditions else "1=1" | |
self.cursor.execute(f'SELECT * FROM ads WHERE {where_clause}', params) | |
ads = self.cursor.fetchall() | |
columns = [desc[0] for desc in self.cursor.description] | |
result = [dict(zip(columns, ad)) for ad in ads] | |
for ad in result: | |
ad['photos'] = ad['photos'].split(',') if ad['photos'] else [] | |
logger.info(f"Found {len(result)} ads for query: {query}") | |
return result | |
except sqlite3.Error as e: | |
logger.error(f"Error searching ads for query {query}: {str(e)}") | |
raise | |
def add_reaction(self, ad_id: int, ad: Dict[str, Any]): | |
"""Add reactions to an ad.""" | |
try: | |
from utils import generate_reactions | |
reactions = generate_reactions(ad) | |
self.cursor.execute('UPDATE ads SET reactions = ? WHERE id = ?', (reactions, ad_id)) | |
self.conn.commit() | |
logger.info(f"Added reactions for ad {ad_id}") | |
except sqlite3.Error as e: | |
logger.error(f"Error adding reactions for ad {ad_id}: {str(e)}") | |
raise | |
except ImportError: | |
logger.error(f"Failed to import generate_reactions from utils for ad {ad_id}") | |
raise | |
def extend_ad_duration(self, ad_id: int): | |
"""Extend the duration of an ad.""" | |
try: | |
self.cursor.execute('UPDATE ads SET status = "approved" WHERE id = ?', (ad_id,)) | |
self.conn.commit() | |
logger.info(f"Extended duration for ad {ad_id}") | |
except sqlite3.Error as e: | |
logger.error(f"Error extending duration for ad {ad_id}: {str(e)}") | |
raise | |
def close(self): | |
"""Close the database connection.""" | |
try: | |
self.conn.close() | |
logger.info("Database connection closed successfully") | |
except sqlite3.Error as e: | |
logger.error(f"Error closing database: {str(e)}") | |