carbot-mixtral / database.py
imstevenleo's picture
Updated app.py and database.py with correct imports and create_tables
4d7acad
raw
history blame
20.9 kB
# Commit test to ensure push
import sqlite3
import random
import string
import logging
import re
from typing import List, Dict, Any, Optional
from datetime import datetime
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler("bot.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class Database:
def __init__(self, db_name: str = "cars.db"):
try:
self.conn = sqlite3.connect(db_name, check_same_thread=False)
self.cursor = self.conn.cursor()
self.create_tables()
self.update_schema()
logger.info(f"Successfully connected to database: {db_name}")
except sqlite3.Error as e:
logger.error(f"Failed to connect to database: {str(e)}")
raise
def create_tables(self):
"""Create database tables and indexes if they don't exist."""
try:
self.cursor.executescript('''
-- Table for users
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
phone TEXT,
shares INTEGER DEFAULT 0,
daily_recommendations INTEGER DEFAULT 0,
last_recommendation_date TEXT
);
-- Table for ads
CREATE TABLE IF NOT EXISTS ads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
price TEXT,
car_name_ar TEXT,
car_name_en TEXT,
model TEXT,
engine_specs TEXT,
mileage TEXT,
origin TEXT,
plate TEXT,
accidents TEXT,
specs TEXT,
notes TEXT,
location TEXT,
phone TEXT,
main_photo TEXT,
photos TEXT,
status TEXT DEFAULT 'pending',
channel_message_id INTEGER,
created_at TEXT,
reactions TEXT DEFAULT '',
FOREIGN KEY(user_id) REFERENCES users(user_id)
);
-- Table for verification codes
CREATE TABLE IF NOT EXISTS verification_codes (
user_id INTEGER,
phone TEXT,
code TEXT,
PRIMARY KEY(user_id, phone)
);
-- Table for reactions
CREATE TABLE IF NOT EXISTS reactions (
ad_id INTEGER,
reaction_type TEXT,
count INTEGER,
last_updated TEXT,
PRIMARY KEY(ad_id, reaction_type),
FOREIGN KEY(ad_id) REFERENCES ads(id)
);
-- Table for stats
CREATE TABLE IF NOT EXISTS stats (
id INTEGER PRIMARY KEY,
subscribers_count INTEGER DEFAULT 1300,
last_updated TEXT
);
-- Insert default stats
INSERT OR IGNORE INTO stats (id, subscribers_count) VALUES (1, 1300);
-- Indexes for faster queries
CREATE INDEX IF NOT EXISTS idx_ads_user_id ON ads(user_id);
CREATE INDEX IF NOT EXISTS idx_ads_status ON ads(status);
CREATE INDEX IF NOT EXISTS idx_ads_car_name_ar ON ads(car_name_ar);
CREATE INDEX IF NOT EXISTS idx_ads_location ON ads(location);
CREATE INDEX IF NOT EXISTS idx_users_phone ON users(phone);
-- Added index on created_at
CREATE INDEX IF NOT EXISTS idx_ads_created_at ON ads(created_at);
''')
self.conn.commit()
logger.info("Database tables and indexes created successfully")
except sqlite3.Error as e:
logger.error(f"Error creating tables: {str(e)}")
raise
def update_schema(self):
"""Update database schema if needed (e.g., add new columns)."""
try:
# Check if 'username' column exists in users table
self.cursor.execute("PRAGMA table_info(users)")
columns = [info[1] for info in self.cursor.fetchall()]
if 'username' not in columns:
self.cursor.execute('ALTER TABLE users ADD COLUMN username TEXT')
self.conn.commit()
logger.info("Added 'username' column to users table")
except sqlite3.Error as e:
logger.error(f"Error updating schema: {str(e)}")
raise
def add_user(self, user_id: int, username: Optional[str] = None):
"""Add or update a user in the database."""
try:
self.cursor.execute('''
INSERT OR IGNORE INTO users (user_id, username, shares, daily_recommendations)
VALUES (?, ?, 0, 0)
''', (user_id, username))
self.cursor.execute('''
UPDATE stats SET subscribers_count = subscribers_count + 1, last_updated = ?
WHERE id = 1
''', (datetime.now().strftime('%Y-%m-%d %H:%M:%S'),))
self.conn.commit()
logger.info(f"User {user_id} (username: {username}) added/updated")
except sqlite3.Error as e:
logger.error(f"Error adding user {user_id}: {str(e)}")
raise
def get_user_shares(self, user_id: int) -> int:
"""Get the number of shares for a user."""
try:
self.cursor.execute('SELECT shares FROM users WHERE user_id = ?', (user_id,))
result = self.cursor.fetchone()
return result[0] if result else 0
except sqlite3.Error as e:
logger.error(f"Error retrieving shares for user {user_id}: {str(e)}")
return 0
def increment_shares(self, user_id: int):
"""Increment the share count for a user."""
try:
self.cursor.execute('UPDATE users SET shares = shares + 1 WHERE user_id = ?', (user_id,))
self.conn.commit()
logger.info(f"Shares incremented for user {user_id}")
except sqlite3.Error as e:
logger.error(f"Error incrementing shares for user {user_id}: {str(e)}")
def get_daily_recommendations(self, user_id: int) -> int:
"""Get the number of daily recommendations for a user."""
try:
self.cursor.execute('SELECT daily_recommendations, last_recommendation_date FROM users WHERE user_id = ?', (user_id,))
result = self.cursor.fetchone()
if result:
count, date = result
today = datetime.now().strftime('%Y-%m-%d')
if date != today:
self.cursor.execute('UPDATE users SET daily_recommendations = 0, last_recommendation_date = ? WHERE user_id = ?', (today, user_id))
self.conn.commit()
logger.info(f"Reset daily recommendations for user {user_id} to 0")
return 0
return count
return 0
except sqlite3.Error as e:
logger.error(f"Error retrieving daily recommendations for user {user_id}: {str(e)}")
return 0
def increment_recommendations(self, user_id: int):
"""Increment the daily recommendation count for a user."""
try:
today = datetime.now().strftime('%Y-%m-%d')
self.cursor.execute('''
UPDATE users SET daily_recommendations = daily_recommendations + 1,
last_recommendation_date = ? WHERE user_id = ?
''', (today, user_id))
self.conn.commit()
logger.info(f"Recommendations incremented for user {user_id}")
except sqlite3.Error as e:
logger.error(f"Error incrementing recommendations for user {user_id}: {str(e)}")
def generate_verification_code(self, user_id: int, phone: str) -> str:
"""Generate a verification code for a user."""
try:
code = ''.join(random.choices(string.digits, k=6))
self.cursor.execute('INSERT OR REPLACE INTO verification_codes (user_id, phone, code) VALUES (?, ?, ?)',
(user_id, phone, code))
self.conn.commit()
logger.info(f"Generated verification code for user {user_id} with phone {phone}")
return code
except sqlite3.Error as e:
logger.error(f"Error generating verification code for user {user_id}: {str(e)}")
raise
def verify_phone_code(self, user_id: int, code: str) -> bool:
"""Verify a phone code for a user."""
try:
self.cursor.execute('SELECT code FROM verification_codes WHERE user_id = ?', (user_id,))
result = self.cursor.fetchone()
if result and result[0] == code:
self.cursor.execute('DELETE FROM verification_codes WHERE user_id = ?', (user_id,))
self.conn.commit()
logger.info(f"Phone code verified for user {user_id}")
return True
logger.warning(f"Invalid phone code for user {user_id}")
return False
except sqlite3.Error as e:
logger.error(f"Error verifying phone code for user {user_id}: {str(e)}")
return False
def create_car(self, model: str, price: float, status: str, description: Optional[str] = None) -> int:
"""Create a new car ad in the database."""
try:
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.cursor.execute('''
INSERT INTO ads (price, car_name_ar, car_name_en, model, status, notes, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
str(price), model, model, model, status, description, created_at
))
ad_id = self.cursor.lastrowid
self.conn.commit()
logger.info(f"Created car ad {ad_id} with model {model}")
return ad_id
except sqlite3.Error as e:
logger.error(f"Error creating car ad: {str(e)}")
raise
def get_car_by_id(self, car_id: int) -> Optional[Dict[str, Any]]:
"""Retrieve a car ad by its ID."""
try:
self.cursor.execute('SELECT id, price, car_name_ar AS model, status, notes AS description, created_at FROM ads WHERE id = ?', (car_id,))
car = self.cursor.fetchone()
if car:
columns = [desc[0] for desc in self.cursor.description]
car_dict = dict(zip(columns, car))
logger.info(f"Retrieved car {car_id}")
return car_dict
logger.warning(f"Car {car_id} not found")
return None
except sqlite3.Error as e:
logger.error(f"Error retrieving car {car_id}: {str(e)}")
raise
def get_all_cars(self) -> List[Dict[str, Any]]:
"""Retrieve all approved car ads."""
try:
self.cursor.execute('SELECT id, price, car_name_ar AS model, status, notes AS description, created_at FROM ads WHERE status = "approved"')
cars = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
cars_list = [dict(zip(columns, car)) for car in cars]
logger.info(f"Retrieved {len(cars_list)} approved cars")
return cars_list
except sqlite3.Error as e:
logger.error(f"Error retrieving all cars: {str(e)}")
raise
def save_ad(self, ad_data: Dict[str, Any], user_id: int) -> int:
"""Save a new ad in the database."""
try:
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.cursor.execute('''
INSERT INTO ads (user_id, price, car_name_ar, car_name_en, model, engine_specs, mileage, origin, plate,
accidents, specs, notes, location, phone, main_photo, photos, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
user_id, ad_data['price'], ad_data['car_name_ar'], ad_data['car_name_en'], ad_data['model'],
ad_data['engine_specs'], ad_data['mileage'], ad_data['origin'], ad_data['plate'], ad_data['accidents'],
ad_data['specs'], ad_data.get('notes'), ad_data['location'], ad_data['phone'], ad_data['main_photo'],
','.join(ad_data['photos']), created_at
))
ad_id = self.cursor.lastrowid
self.conn.commit()
logger.info(f"Ad {ad_id} saved for user {user_id}")
return ad_id
except sqlite3.Error as e:
logger.error(f"Error saving ad for user {user_id}: {str(e)}")
raise
def get_ad(self, ad_id: int) -> Optional[Dict[str, Any]]:
"""Retrieve an ad by its ID."""
try:
self.cursor.execute('SELECT * FROM ads WHERE id = ?', (ad_id,))
ad = self.cursor.fetchone()
if ad:
columns = [desc[0] for desc in self.cursor.description]
ad_dict = dict(zip(columns, ad))
ad_dict['photos'] = ad_dict['photos'].split(',') if ad_dict['photos'] else []
logger.info(f"Retrieved ad {ad_id}")
return ad_dict
logger.warning(f"Ad {ad_id} not found")
return None
except sqlite3.Error as e:
logger.error(f"Error retrieving ad {ad_id}: {str(e)}")
raise
def update_ad_field(self, ad_id: int, field: str, value: Any):
"""Update a specific field for an ad."""
try:
self.cursor.execute(f'UPDATE ads SET {field} = ? WHERE id = ?', (value, ad_id))
self.conn.commit()
logger.info(f"Updated field {field} for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error updating field {field} for ad {ad_id}: {str(e)}")
raise
def update_ad_status(self, ad_id: int, status: str):
"""Update the status of an ad."""
try:
self.cursor.execute('UPDATE ads SET status = ? WHERE id = ?', (status, ad_id))
self.conn.commit()
logger.info(f"Updated status to {status} for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error updating status for ad {ad_id}: {str(e)}")
raise
def get_available_cars(self, user_id: Optional[int] = None) -> List[Dict[str, Any]]:
"""Retrieve approved ads, optionally filtered by user ID."""
try:
if user_id:
self.cursor.execute('SELECT * FROM ads WHERE status = "approved" AND user_id = ?', (user_id,))
else:
self.cursor.execute('SELECT * FROM ads WHERE status = "approved"')
ads = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
result = []
for ad in ads:
ad_dict = dict(zip(columns, ad))
ad_dict['photos'] = ad_dict['photos'].split(',') if ad_dict['photos'] else []
result.append(ad_dict)
logger.info(f"Retrieved {len(result)} available cars")
return result
except sqlite3.Error as e:
logger.error(f"Error retrieving available cars: {str(e)}")
raise
def save_channel_message(self, ad_id: int, message_id: int):
"""Save a channel message ID for an ad."""
try:
self.cursor.execute('UPDATE ads SET channel_message_id = ? WHERE id = ?', (message_id, ad_id))
self.conn.commit()
logger.info(f"Saved channel message ID {message_id} for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error saving channel message for ad {ad_id}: {str(e)}")
raise
def get_total_users(self) -> int:
"""Get the total number of users."""
try:
self.cursor.execute("SELECT subscribers_count FROM stats WHERE id = 1")
result = self.cursor.fetchone()
count = result[0] if result else 1300
logger.info(f"Retrieved total users: {count}")
return count
except sqlite3.Error as e:
logger.error(f"Error retrieving total users: {str(e)}")
return 1300
def search_ads(self, query: str) -> List[Dict[str, Any]]:
"""Search ads based on a query string."""
try:
query = query.lower().strip()
search_terms = re.split(r'\s+', query)
conditions = ["status = 'approved'"]
params = []
for term in search_terms:
if term.isdigit() and len(term) == 4: # Model year (e.g., 2020)
conditions.append("model LIKE ?")
params.append(f"%{term}%")
elif re.match(r'(\d+\.?\d*)\s*(دولار|دينار|-)?(?:\s*(\d+\.?\d*))?', term): # Price or range (e.g., 20 دولار, 20-30)
price_match = re.match(r'(\d+\.?\d*)\s*(دولار|دينار|-)?(?:\s*(\d+\.?\d*))?', term)
if price_match:
low_price = float(price_match.group(1))
currency = price_match.group(2)
high_price = float(price_match.group(3)) if price_match.group(3) else None
if currency == '-':
conditions.append("CAST(replace(replace(price, '$', ''), ' ألف دولار', '000') AS REAL) BETWEEN ? AND ?")
params.extend([low_price * 1000, high_price * 1000])
else:
conditions.append("LOWER(price) LIKE ?")
params.append(f'%{low_price}%{currency if currency else ""}%')
else: # Car name, location, or specs
conditions.append("(LOWER(car_name_ar) LIKE ? OR LOWER(car_name_en) LIKE ? OR LOWER(location) LIKE ? OR LOWER(specs) LIKE ?)")
params.extend([f"%{term}%", f"%{term}%", f"%{term}%", f"%{term}%"])
where_clause = " AND ".join(conditions) if conditions else "1=1"
self.cursor.execute(f'SELECT * FROM ads WHERE {where_clause}', params)
ads = self.cursor.fetchall()
columns = [desc[0] for desc in self.cursor.description]
result = [dict(zip(columns, ad)) for ad in ads]
for ad in result:
ad['photos'] = ad['photos'].split(',') if ad['photos'] else []
logger.info(f"Found {len(result)} ads for query: {query}")
return result
except sqlite3.Error as e:
logger.error(f"Error searching ads for query {query}: {str(e)}")
raise
def add_reaction(self, ad_id: int, ad: Dict[str, Any]):
"""Add reactions to an ad."""
try:
from utils import generate_reactions
reactions = generate_reactions(ad)
self.cursor.execute('UPDATE ads SET reactions = ? WHERE id = ?', (reactions, ad_id))
self.conn.commit()
logger.info(f"Added reactions for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error adding reactions for ad {ad_id}: {str(e)}")
raise
except ImportError:
logger.error(f"Failed to import generate_reactions from utils for ad {ad_id}")
raise
def extend_ad_duration(self, ad_id: int):
"""Extend the duration of an ad."""
try:
self.cursor.execute('UPDATE ads SET status = "approved" WHERE id = ?', (ad_id,))
self.conn.commit()
logger.info(f"Extended duration for ad {ad_id}")
except sqlite3.Error as e:
logger.error(f"Error extending duration for ad {ad_id}: {str(e)}")
raise
def close(self):
"""Close the database connection."""
try:
self.conn.close()
logger.info("Database connection closed successfully")
except sqlite3.Error as e:
logger.error(f"Error closing database: {str(e)}")