Spaces:
Running
Running
File size: 20,938 Bytes
4d7acad 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 539ff69 4e127d3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 |
# Commit test to ensure push
import sqlite3
import random
import string
import logging
import re
from typing import List, Dict, Any, Optional
from datetime import datetime
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler("bot.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class Database:
def __init__(self, db_name: str = "cars.db"):
try:
self.conn = sqlite3.connect(db_name, check_same_thread=False)
self.cursor = self.conn.cursor()
self.create_tables()
self.update_schema()
logger.info(f"Successfully connected to database: {db_name}")
except sqlite3.Error as e:
logger.error(f"Failed to connect to database: {str(e)}")
raise
def create_tables(self):
"""Create database tables and indexes if they don't exist."""
try:
self.cursor.executescript('''
-- Table for users
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
phone TEXT,
shares INTEGER DEFAULT 0,
daily_recommendations INTEGER DEFAULT 0,
last_recommendation_date TEXT
);
-- Table for ads
CREATE TABLE IF NOT EXISTS ads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
price TEXT,
car_name_ar TEXT,
car_name_en TEXT,
model TEXT,
engine_specs TEXT,
mileage TEXT,
origin TEXT,
plate TEXT,
accidents TEXT,
specs TEXT,
notes TEXT,
location TEXT,
phone TEXT,
main_photo TEXT,
photos TEXT,
status TEXT DEFAULT 'pending',
channel_message_id INTEGER,
created_at TEXT,
reactions TEXT DEFAULT '',
FOREIGN KEY(user_id) REFERENCES users(user_id)
);
-- Table for verification codes
CREATE TABLE IF NOT EXISTS verification_codes (
user_id INTEGER,
phone TEXT,
code TEXT,
PRIMARY KEY(user_id, phone)
);
-- Table for reactions
CREATE TABLE IF NOT EXISTS reactions (
ad_id INTEGER,
reaction_type TEXT,
count INTEGER,
last_updated TEXT,
PRIMARY KEY(ad_id, reaction_type),
FOREIGN KEY(ad_id) REFERENCES ads(id)
);
-- Table for stats
CREATE TABLE IF NOT EXISTS stats (
id INTEGER PRIMARY KEY,
subscribers_count INTEGER DEFAULT 1300,
last_updated TEXT
);
-- Insert default stats
INSERT OR IGNORE INTO stats (id, subscribers_count) VALUES (1, 1300);
-- Indexes for faster queries
CREATE INDEX IF NOT EXISTS idx_ads_user_id ON ads(user_id);
CREATE INDEX IF NOT EXISTS idx_ads_status ON ads(status);
CREATE INDEX IF NOT EXISTS idx_ads_car_name_ar ON ads(car_name_ar);
CREATE INDEX IF NOT EXISTS idx_ads_location ON ads(location);
CREATE INDEX IF NOT EXISTS idx_users_phone ON users(phone);
-- Added index on created_at
CREATE INDEX IF NOT EXISTS idx_ads_created_at ON ads(created_at);
''')
self.conn.commit()
logger.info("Database tables and indexes created successfully")
except sqlite3.Error as e:
logger.error(f"Error creating tables: {str(e)}")
raise
def update_schema(self):
"""Update database schema if needed (e.g., add new columns)."""
try:
# Check if 'username' column exists in users table
self.cursor.execute("PRAGMA table_info(users)")
columns = [info[1] for info in self.cursor.fetchall()]
if 'username' not in columns:
self.cursor.execute('ALTER TABLE users ADD COLUMN username TEXT')
self.conn.commit()
logger.info("Added 'username' column to users table")
except sqlite3.Error as e:
logger.error(f"Error updating schema: {str(e)}")
raise
def add_user(self, user_id: int, username: Optional[str] = None):
"""Add or update a user in the database."""
try:
self.cursor.execute('''
INSERT OR IGNORE INTO users (user_id, username, shares, daily_recommendations)
VALUES (?, ?, 0, 0)
''', (user_id, username))
self.cursor.execute('''
UPDATE stats SET subscribers_count = subscribers_count + 1, last_updated = ?
WHERE id = 1
''', (datetime.now().strftime('%Y-%m-%d %H:%M:%S'),))
self.conn.commit()
logger.info(f"User {user_id} (username: {username}) added/updated")
except sqlite3.Error as e:
logger.error(f"Error adding user {user_id}: {str(e)}")
raise
def get_user_shares(self, user_id: int) -> int:
"""Get the number of shares for a user."""
try:
self.cursor.execute('SELECT shares FROM users WHERE user_id = ?', (user_id,))
result = self.cursor.fetchone()
return result[0] if result else 0
except sqlite3.Error as e:
logger.error(f"Error retrieving shares for user {user_id}: {str(e)}")
return 0
def increment_shares(self, user_id: int):
"""Increment the share count for a user."""
try:
self.cursor.execute('UPDATE users SET shares = shares + 1 WHERE user_id = ?', (user_id,))
self.conn.commit()
logger.info(f"Shares incremented for user {user_id}")
except sqlite3.Error as e:
logger.error(f"Error incrementing shares for user {user_id}: {str(e)}")
def get_daily_recommendations(self, user_id: int) -> int:
"""Get the number of daily recommendations for a user."""
try:
self.cursor.execute('SELECT daily_recommendations, last_recommendation_date FROM users WHERE user_id = ?', (user_id,))
result = self.cursor.fetchone()
if result:
count, date = result
today = datetime.now().strftime('%Y-%m-%d')
if date != today:
self.cursor.execute('UPDATE users SET daily_recommendations = 0, last_recommendation_date = ? WHERE user_id = ?', (today, user_id))
self.conn.commit()
logger.info(f"Reset daily recommendations for user {user_id} to 0")
return 0
return count
return 0
except sqlite3.Error as e:
logger.error(f"Error retrieving daily recommendations for user {user_id}: {str(e)}")
return 0
def increment_recommendations(self, user_id: int):
"""Increment the daily recommendation count for a user."""
try:
today = datetime.now().strftime('%Y-%m-%d')
self.cursor.execute('''
UPDATE users SET daily_recommendations = daily_recommendations + 1,
last_recommendation_date = ? WHERE user_id = ?
''', (today, user_id))
self.conn.commit()
logger.info(f"Recommendations incremented for user {user_id}")
except sqlite3.Error as e:
logger.error(f"Error incrementing recommendations for user {user_id}: {str(e)}")
def generate_verification_code(self, user_id: int, phone: str) -> str:
"""Generate a verification code for a user."""
try:
code = ''.join(random.choices(string.digits, k=6))
self.cursor.execute('INSERT OR REPLACE INTO verification_codes (user_id, phone, code) VALUES (?, ?, ?)',
(user_id, phone, code))
self.conn.commit()
logger.info(f"Generated verification code for user {user_id} with phone {phone}")
return code
except sqlite3.Error as e:
logger.error(f"Error generating verification code for user {user_id}: {str(e)}")
raise
def verify_phone_code(self, user_id: int, code: str) -> bool:
"""Verify a phone code for a user."""
try:
self.cursor.execute('SELECT code FROM verification_codes WHERE user_id = ?', (user_id,))
result = self.cursor.fetchone()
if result and result[0] == code:
self.cursor.execute('DELETE FROM verification_codes WHERE user_id = ?', (user_id,))
self.conn.commit()
logger.info(f"Phone code verified for user {user_id}")
return True
logger.warning(f"Invalid phone code for user {user_id}")
return False
except sqlite3.Error as e:
logger.error(f"Error verifying phone code for user {user_id}: {str(e)}")
return False
def create_car(self, model: str, price: float, status: str, description: Optional[str] = None) -> int:
"""Create a new car ad in the database."""
try:
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.cursor.execute('''
INSERT INTO ads (price, car_name_ar, car_name_en, model, status, notes, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
str(price), model, model, model, status, description, created_at
))
ad_id = self.cursor.lastrowid
self.conn.commit()
logger.info(f"Created car ad {ad_id} with model {model}")
return ad_id
except sqlite3.Error as e:
logger.error(f"Error creating car ad: {str(e)}")
raise
def get_car_by_id(self, car_id: int) -> Optional[Dict[str, Any]]:
"""Retrieve a car ad by its ID."""
try:
self.cursor.execute('SELECT id, price, car_name_ar AS model, status, notes AS description, created_at FROM ads WHERE id = ?', (car_id,))
car = self.cursor.fetchone()
if car:
columns = [desc[0] for desc in self.cursor.description]
car_dict = dict(zip(columns, car))
logger.info(f"Retrieved car {car_id}")
return car_dict
logger.warning(f"Car {car_id} not found")
return None
except sqlite3.Error as e:
logger.error(f"Error retrieving car {car_id}: {str(e)}")
raise
def get_all_cars(self) -> List[Dict[str, Any]]:
"""Retrieve all approved car ads."""
try:
self.cursor.execute('SELECT id, price, car_name_ar AS model, status, notes AS description, created_at FROM ads WHERE status = "approved"')
cars = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
cars_list = [dict(zip(columns, car)) for car in cars]
logger.info(f"Retrieved {len(cars_list)} approved cars")
return cars_list
except sqlite3.Error as e:
logger.error(f"Error retrieving all cars: {str(e)}")
raise
def save_ad(self, ad_data: Dict[str, Any], user_id: int) -> int:
"""Save a new ad in the database."""
try:
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.cursor.execute('''
INSERT INTO ads (user_id, price, car_name_ar, car_name_en, model, engine_specs, mileage, origin, plate,
accidents, specs, notes, location, phone, main_photo, photos, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
user_id, ad_data['price'], ad_data['car_name_ar'], ad_data['car_name_en'], ad_data['model'],
ad_data['engine_specs'], ad_data['mileage'], ad_data['origin'], ad_data['plate'], ad_data['accidents'],
ad_data['specs'], ad_data.get('notes'), ad_data['location'], ad_data['phone'], ad_data['main_photo'],
','.join(ad_data['photos']), created_at
))
ad_id = self.cursor.lastrowid
self.conn.commit()
logger.info(f"Ad {ad_id} saved for user {user_id}")
return ad_id
except sqlite3.Error as e:
logger.error(f"Error saving ad for user {user_id}: {str(e)}")
raise
def get_ad(self, ad_id: int) -> Optional[Dict[str, Any]]:
"""Retrieve an ad by its ID."""
try:
self.cursor.execute('SELECT * FROM ads WHERE id = ?', (ad_id,))
ad = self.cursor.fetchone()
if ad:
columns = [desc[0] for desc in self.cursor.description]
ad_dict = dict(zip(columns, ad))
ad_dict['photos'] = ad_dict['photos'].split(',') if ad_dict['photos'] else []
logger.info(f"Retrieved ad {ad_id}")
return ad_dict
logger.warning(f"Ad {ad_id} not found")
return None
except sqlite3.Error as e:
logger.error(f"Error retrieving ad {ad_id}: {str(e)}")
raise
def update_ad_field(self, ad_id: int, field: str, value: Any):
"""Update a specific field for an ad."""
try:
self.cursor.execute(f'UPDATE ads SET {field} = ? WHERE id = ?', (value, ad_id))
self.conn.commit()
logger.info(f"Updated field {field} for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error updating field {field} for ad {ad_id}: {str(e)}")
raise
def update_ad_status(self, ad_id: int, status: str):
"""Update the status of an ad."""
try:
self.cursor.execute('UPDATE ads SET status = ? WHERE id = ?', (status, ad_id))
self.conn.commit()
logger.info(f"Updated status to {status} for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error updating status for ad {ad_id}: {str(e)}")
raise
def get_available_cars(self, user_id: Optional[int] = None) -> List[Dict[str, Any]]:
"""Retrieve approved ads, optionally filtered by user ID."""
try:
if user_id:
self.cursor.execute('SELECT * FROM ads WHERE status = "approved" AND user_id = ?', (user_id,))
else:
self.cursor.execute('SELECT * FROM ads WHERE status = "approved"')
ads = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
result = []
for ad in ads:
ad_dict = dict(zip(columns, ad))
ad_dict['photos'] = ad_dict['photos'].split(',') if ad_dict['photos'] else []
result.append(ad_dict)
logger.info(f"Retrieved {len(result)} available cars")
return result
except sqlite3.Error as e:
logger.error(f"Error retrieving available cars: {str(e)}")
raise
def save_channel_message(self, ad_id: int, message_id: int):
"""Save a channel message ID for an ad."""
try:
self.cursor.execute('UPDATE ads SET channel_message_id = ? WHERE id = ?', (message_id, ad_id))
self.conn.commit()
logger.info(f"Saved channel message ID {message_id} for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error saving channel message for ad {ad_id}: {str(e)}")
raise
def get_total_users(self) -> int:
"""Get the total number of users."""
try:
self.cursor.execute("SELECT subscribers_count FROM stats WHERE id = 1")
result = self.cursor.fetchone()
count = result[0] if result else 1300
logger.info(f"Retrieved total users: {count}")
return count
except sqlite3.Error as e:
logger.error(f"Error retrieving total users: {str(e)}")
return 1300
def search_ads(self, query: str) -> List[Dict[str, Any]]:
"""Search ads based on a query string."""
try:
query = query.lower().strip()
search_terms = re.split(r'\s+', query)
conditions = ["status = 'approved'"]
params = []
for term in search_terms:
if term.isdigit() and len(term) == 4: # Model year (e.g., 2020)
conditions.append("model LIKE ?")
params.append(f"%{term}%")
elif re.match(r'(\d+\.?\d*)\s*(دولار|دينار|-)?(?:\s*(\d+\.?\d*))?', term): # Price or range (e.g., 20 دولار, 20-30)
price_match = re.match(r'(\d+\.?\d*)\s*(دولار|دينار|-)?(?:\s*(\d+\.?\d*))?', term)
if price_match:
low_price = float(price_match.group(1))
currency = price_match.group(2)
high_price = float(price_match.group(3)) if price_match.group(3) else None
if currency == '-':
conditions.append("CAST(replace(replace(price, '$', ''), ' ألف دولار', '000') AS REAL) BETWEEN ? AND ?")
params.extend([low_price * 1000, high_price * 1000])
else:
conditions.append("LOWER(price) LIKE ?")
params.append(f'%{low_price}%{currency if currency else ""}%')
else: # Car name, location, or specs
conditions.append("(LOWER(car_name_ar) LIKE ? OR LOWER(car_name_en) LIKE ? OR LOWER(location) LIKE ? OR LOWER(specs) LIKE ?)")
params.extend([f"%{term}%", f"%{term}%", f"%{term}%", f"%{term}%"])
where_clause = " AND ".join(conditions) if conditions else "1=1"
self.cursor.execute(f'SELECT * FROM ads WHERE {where_clause}', params)
ads = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
result = [dict(zip(columns, ad)) for ad in ads]
for ad in result:
ad['photos'] = ad['photos'].split(',') if ad['photos'] else []
logger.info(f"Found {len(result)} ads for query: {query}")
return result
except sqlite3.Error as e:
logger.error(f"Error searching ads for query {query}: {str(e)}")
raise
def add_reaction(self, ad_id: int, ad: Dict[str, Any]):
"""Add reactions to an ad."""
try:
from utils import generate_reactions
reactions = generate_reactions(ad)
self.cursor.execute('UPDATE ads SET reactions = ? WHERE id = ?', (reactions, ad_id))
self.conn.commit()
logger.info(f"Added reactions for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error adding reactions for ad {ad_id}: {str(e)}")
raise
except ImportError:
logger.error(f"Failed to import generate_reactions from utils for ad {ad_id}")
raise
def extend_ad_duration(self, ad_id: int):
"""Extend the duration of an ad."""
try:
self.cursor.execute('UPDATE ads SET status = "approved" WHERE id = ?', (ad_id,))
self.conn.commit()
logger.info(f"Extended duration for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error extending duration for ad {ad_id}: {str(e)}")
raise
def close(self):
"""Close the database connection."""
try:
self.conn.close()
logger.info("Database connection closed successfully")
except sqlite3.Error as e:
logger.error(f"Error closing database: {str(e)}")
|