from app.database.db import db import re from bson import ObjectId from datetime import datetime, timezone, timedelta from pymongo import DESCENDING from typing import Optional class DatabaseQuery: def __init__(self): pass def create_chat_session(self, chat_session): try: db.chat_sessions.insert_one(chat_session) except Exception as e: raise Exception(f"Error creating chat session: {str(e)}") def get_user_chat_sessions(self, user_id): try: sessions = list(db.chat_sessions.find( {"user_id": user_id}, {"_id": 0} ).sort("last_accessed", -1)) return sessions except Exception as e: raise Exception(f"Error retrieving user chat sessions: {str(e)}") def create_chat(self, chat_data): try: db.chats.insert_one(chat_data) return True except Exception as e: raise Exception(f"Error creating chat: {str(e)}") def update_last_accessed_time(self, session_id): try: db.chat_sessions.update_one( {"session_id": session_id}, {"$set": {"last_accessed": datetime.now(timezone.utc)}} ) except Exception as e: raise Exception(f"Error updating last accessed time: {str(e)}") def get_session_chats(self, session_id, user_id): try: chats = list(db.chats.find( {"session_id": session_id, "user_id": user_id}, {"_id": 0} ).sort("timestamp", 1)) return chats except Exception as e: raise Exception(f"Error retrieving session chats: {str(e)}") def get_user_by_identifier(self, identifier): try: user = db.users.find_one({'$or': [{'username': identifier}, {'email': identifier}]}) return user except Exception as e: raise Exception(f"Error retrieving user by identifier: {str(e)}") def add_token_to_blacklist(self, jti): try: db.blacklist.insert_one({'jti': jti}) except Exception as e: raise Exception(f"Error adding token to blacklist: {str(e)}") def create_indexes(self): try: db.chat_sessions.create_index([("user_id", 1), ("last_accessed", -1)]) db.chat_sessions.create_index([("session_id", 1)]) db.chats.create_index([("session_id", 1), ("timestamp", 1)]) db.chats.create_index([("user_id", 1)]) except Exception as e: raise Exception(f"Error creating indexes: {str(e)}") def check_chat_session(self, session_id): try: chat_session = db.chat_sessions.find_one({'session_id': session_id}) return chat_session is not None except Exception as e: raise Exception(f"Error checking chat session: {str(e)}") def get_user_profile(self, username): try: user = db.users.find_one({'username': username}, {'password': 0}) return user except Exception as e: raise Exception(f"Error getting user profile: {str(e)}") def update_user_profile(self, username, update_fields): try: result = db.users.update_one( {'username': username}, {'$set': update_fields} ) return result.modified_count > 0 except Exception as e: raise Exception(f"Error updating user profile: {str(e)}") def delete_user_account(self, username): try: result = db.users.delete_one({'username': username}) return result.deleted_count > 0 except Exception as e: raise Exception(f"Error deleting user account: {str(e)}") def is_username_or_email_exists(self, username, email): try: user = db.users.find_one({'$or': [{'username': username}, {'email': email}]}) return user is not None except Exception as e: raise Exception(f"Error checking if username or email exists: {str(e)}") def create_or_update_temp_user(self, username, email, temp_user_data): try: db.temp_users.update_one( {'$or': [{'username': username}, {'email': email}]}, {'$set': temp_user_data}, upsert=True ) except Exception as e: raise Exception(f"Error creating/updating temp user: {str(e)}") def get_temp_user_by_username(self, username): try: temp_user = db.temp_users.find_one({'username': username}) return temp_user except Exception as e: raise Exception(f"Error retrieving temp user by username: {str(e)}") def delete_temp_user(self, username): try: db.temp_users.delete_one({'username': username}) except Exception as e: raise Exception(f"Error deleting temp user: {str(e)}") def create_user_from_data(self, user_data): try: db.users.insert_one(user_data) return user_data except Exception as e: raise Exception(f"Error creating user from data: {str(e)}") def create_user(self, username, email, hashed_password, name, age, created_at, is_verified=False, verification_code=None, code_expiration=None): try: new_user = { 'username': username, 'email': email, 'password': hashed_password, 'name': name, 'age': age, 'created_at': created_at, 'is_verified': is_verified } if verification_code and code_expiration: new_user['verification_code'] = verification_code new_user['code_expiration'] = code_expiration db.users.insert_one(new_user) return new_user except Exception as e: raise Exception(f"Error creating user: {str(e)}") def get_user_by_username(self, username): try: user = db.users.find_one({'username': username}) return user except Exception as e: raise Exception(f"Error retrieving user by username: {str(e)}") def verify_user_email(self, username): try: result = db.users.update_one( {'username': username}, {'$set': {'is_verified': True}, '$unset': {'verification_code': '', 'code_expiration': ''}} ) return result.modified_count > 0 except Exception as e: raise Exception(f"Error verifying user email: {str(e)}") def update_verification_code(self, username, verification_code, code_expiration): try: result = db.users.update_one( {'username': username}, {'$set': {'verification_code': verification_code, 'code_expiration': code_expiration}} ) return result.modified_count > 0 except Exception as e: raise Exception(f"Error updating verification code: {str(e)}") def is_valid_email(self, email): try: email_regex = r'^\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,4}\b' return re.match(email_regex, email) is not None except Exception as e: raise Exception(f"Error validating email: {str(e)}") def add_or_update_location(self, username, location): try: db.locations.update_one( {'username': username}, {'$set': {'location': location, 'updated_at': datetime.now(timezone.utc)}}, upsert=True ) except Exception as e: raise Exception(f"Error adding/updating location: {str(e)}") def get_location(self, username): try: location = db.locations.find_one({'username': username}) return location except Exception as e: raise Exception(f"Error retrieving location: {str(e)}") def submit_questionnaire(self, user_id, answers): try: questionnaire_data = { 'user_id': user_id, 'answers': answers, 'created_at': datetime.now(timezone.utc), 'updated_at': datetime.now(timezone.utc) } result = db.questionnaires.insert_one(questionnaire_data) return str(result.inserted_id) except Exception as e: raise Exception(f"Error submitting questionnaire: {str(e)}") def get_latest_questionnaire(self, user_id): try: questionnaire = db.questionnaires.find_one( {'user_id': user_id}, sort=[('created_at', -1)] ) if questionnaire: questionnaire['_id'] = str(questionnaire['_id']) return questionnaire except Exception as e: raise Exception(f"Error getting latest questionnaire: {str(e)}") def update_questionnaire(self, questionnaire_id, user_id, answers): try: result = db.questionnaires.update_one( {'_id': ObjectId(questionnaire_id), 'user_id': user_id}, { '$set': { 'answers': answers, 'updated_at': datetime.now(timezone.utc) } } ) return result.modified_count > 0 except Exception as e: raise Exception(f"Error updating questionnaire: {str(e)}") def delete_questionnaire(self, questionnaire_id, user_id): try: result = db.questionnaires.delete_one( {'_id': ObjectId(questionnaire_id), 'user_id': user_id} ) return result.deleted_count > 0 except Exception as e: raise Exception(f"Error deleting questionnaire: {str(e)}") def count_answered_questions(self, username): try: answered_count = db.questions.count_documents({ 'username': username, 'answer': {'$ne': None} }) return answered_count except Exception as e: raise Exception(f"Error counting answered questions: {str(e)}") def get_user_preferences(self, username): try: user_preferences = db.preferences.find_one({'username': username}) if not user_preferences: return { 'keywords': False, 'references': False, 'websearch': False, 'personalized_recommendations': False, 'environmental_recommendations': False } return { 'keywords': user_preferences.get('keywords', False), 'references': user_preferences.get('references', False), 'websearch': user_preferences.get('websearch', False), 'personalized_recommendations': user_preferences.get('personalized_recommendations', False), 'environmental_recommendations': user_preferences.get('environmental_recommendations', False) } except Exception as e: raise Exception(f"Error getting user preferences: {str(e)}") def set_user_preferences(self, username, preferences): try: preferences_data = { 'username': username, 'keywords': bool(preferences.get('keywords', False)), 'references': bool(preferences.get('references', False)), 'websearch': bool(preferences.get('websearch', False)), 'personalized_recommendations': bool(preferences.get('personalized_recommendations', False)), 'environmental_recommendations': bool(preferences.get('environmental_recommendations', False)), 'updated_at': datetime.now(timezone.utc) } result = db.preferences.update_one( {'username': username}, {'$set': preferences_data}, upsert=True ) return preferences_data except Exception as e: raise Exception(f"Error setting user preferences: {str(e)}") def get_user_theme(self, username): try: user_theme = db.user_themes.find_one({'username': username}) if not user_theme: return 'light' return user_theme.get('theme', 'light') except Exception as e: raise Exception(f"Error getting user theme: {str(e)}") def set_user_theme(self, username, theme): try: theme_data = { 'username': username, 'theme': "dark" if theme else "light", 'updated_at': datetime.now(timezone.utc) } db.user_themes.update_one( {'username': username}, {'$set': theme_data}, upsert=True ) return theme_data except Exception as e: raise Exception(f"Error setting user theme: {str(e)}") def verify_session(self, session_id, user_id): try: session = db.chat_sessions.find_one({ "session_id": session_id, "user_id": user_id }) return session is not None except Exception as e: raise Exception(f"Error verifying session: {str(e)}") def update_chat_session_title(self, session_id, new_title): try: result = db.chat_sessions.update_one( {"session_id": session_id}, {"$set": {"title": new_title}} ) if result.matched_count == 0: raise Exception("Chat session not found") return result.modified_count > 0 except Exception as e: raise Exception(f"Error updating chat session title: {str(e)}") def delete_chat_session(self, session_id, user_id): try: session_result = db.chat_sessions.delete_one({ "session_id": session_id, "user_id": user_id }) chats_result = db.chats.delete_many({ "session_id": session_id, "user_id": user_id }) return { "session_deleted": session_result.deleted_count > 0, "chats_deleted": chats_result.deleted_count } except Exception as e: raise Exception(f"Error deleting chat session and chats: {str(e)}") def delete_all_user_sessions_and_chats(self, user_id): try: chats_result = db.chats.delete_many({"user_id": user_id}) sessions_result = db.chat_sessions.delete_many({"user_id": user_id}) return { "deleted_chats": chats_result.deleted_count, "deleted_sessions": sessions_result.deleted_count } except Exception as e: raise Exception(f"Error deleting user sessions and chats: {str(e)}") def get_all_user_chats(self, user_id): try: sessions = list(db.chat_sessions.find( {"user_id": user_id}, {"_id": 0} ).sort("last_accessed", -1)) all_chats = [] for session in sessions: session_chats = list(db.chats.find( {"session_id": session["session_id"], "user_id": user_id}, {"_id": 0} ).sort("timestamp", 1)) all_chats.append({ "session_id": session["session_id"], "title": session.get("title", "New Chat"), "created_at": session.get("created_at"), "last_accessed": session.get("last_accessed"), "chats": session_chats }) return all_chats except Exception as e: raise Exception(f"Error retrieving all user chats: {str(e)}") def store_reset_token(self, email, token, expiration): try: db.password_resets.update_one( {'email': email}, { '$set': { 'token': token, 'expiration': expiration } }, upsert=True ) except Exception as e: raise Exception(f"Error storing reset token: {str(e)}") def verify_reset_token(self, token): try: reset_info = db.password_resets.find_one({ 'token': token, 'expiration': {'$gt': datetime.now(timezone.utc)} }) return reset_info except Exception as e: raise Exception(f"Error verifying reset token: {str(e)}") def update_password(self, email, hashed_password): try: db.users.update_one( {'email': email}, {'$set': {'password': hashed_password}} ) except Exception as e: raise Exception(f"Error updating password: {str(e)}") def delete_reset_token(self, token): try: db.password_resets.delete_one({'token': token}) except Exception as e: raise Exception(f"Error deleting reset token: {str(e)}") def delete_account_permanently(self, username): try: chat_deletion_result = self.delete_all_user_sessions_and_chats(username) preferences_result = db.preferences.delete_one({'username': username}) theme_result = db.user_themes.delete_one({'username': username}) location_result = db.locations.delete_one({'username': username}) questionnaire_result = db.questionnaires.delete_many({'user_id': username}) user_result = db.users.delete_one({'username': username}) return { 'success': True, 'deleted_data': { 'chats': chat_deletion_result['deleted_chats'], 'chat_sessions': chat_deletion_result['deleted_sessions'], 'preferences': preferences_result.deleted_count, 'theme': theme_result.deleted_count, 'location': location_result.deleted_count, 'questionnaires': questionnaire_result.deleted_count, 'user_account': user_result.deleted_count } } except Exception as e: raise Exception(f"Error deleting account permanently: {str(e)}") def store_reset_token(self, email, token, expiration): try: db.password_resets.update_one( {'email': email}, { '$set': { 'token': token, 'expiration': expiration } }, upsert=True ) except Exception as e: raise Exception(f"Error storing reset token: {str(e)}") def verify_reset_token(self, token): try: reset_info = db.password_resets.find_one({ 'token': token, 'expiration': {'$gt': datetime.now(timezone.utc)} }) return reset_info except Exception as e: raise Exception(f"Error verifying reset token: {str(e)}") def update_password(self, email, new_password): try: db.users.update_one( {'email': email}, {'$set': {'password': new_password}} ) except Exception as e: raise Exception(f"Error updating password: {str(e)}") def get_user_language(self, user_id): try: language = db.languages.find_one({'user_id': user_id}) return language.get('language') if language else None except Exception as e: raise Exception(f"Error retrieving user language: {str(e)}") def set_user_language(self, user_id, language): try: language_data = { 'user_id': user_id, 'language': language, 'updated_at': datetime.now(timezone.utc) } result = db.languages.update_one( {'user_id': user_id}, {'$set': language_data}, upsert=True ) return language_data except Exception as e: raise Exception(f"Error setting user language: {str(e)}") def delete_user_language(self, user_id): try: result = db.languages.delete_one({'user_id': user_id}) return result.deleted_count > 0 except Exception as e: raise Exception(f"Error deleting user language: {str(e)}") def get_today_schedule(self, user_id): try: # Get today's date at midnight UTC today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) tomorrow = today.replace(hour=23, minute=59, second=59) schedule = db.skin_schedules.find_one({ "user_id": user_id, "created_at": { "$gte": today, "$lte": tomorrow } }) return schedule except Exception as e: raise Exception(f"Error retrieving today's schedule: {str(e)}") def save_schedule(self, user_id, schedule_data): try: existing_schedule = self.get_today_schedule(user_id) if existing_schedule: return str(existing_schedule["_id"]) schedule = { "user_id": user_id, "schedule_data": schedule_data, "created_at": datetime.now(timezone.utc) } result = db.skin_schedules.insert_one(schedule) return str(result.inserted_id) except Exception as e: raise Exception(f"Error saving schedule: {str(e)}") def get_last_seven_days_schedules(self, user_id): try: seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7) schedules = db.skin_schedules.find({ "user_id": user_id, "created_at": {"$gte": seven_days_ago} }).sort("created_at", -1) return list(schedules) except Exception as e: raise Exception(f"Error fetching last 7 days schedules: {str(e)}") def save_rag_interaction(self, user_id: str, session_id: str, context: str, query: str, response: str, rag_start_time: datetime, rag_end_time: datetime): try: interaction = { "interaction_id": str(ObjectId()), "user_id": user_id, "session_id": session_id, "context": context, "query": query, "response": response, "rag_start_time": rag_start_time.astimezone(timezone.utc), "rag_end_time": rag_end_time.astimezone(timezone.utc), "created_at": datetime.now(timezone.utc) } result = db.rag_interactions.insert_one(interaction) return interaction["interaction_id"] except Exception as e: raise Exception(f"Error saving RAG interaction: {str(e)}") def get_rag_interactions( self, user_id: Optional[str] = None, page: int = 1, page_size: int = 5 ) -> dict: try: query_filter = {} if user_id: query_filter["user_id"] = user_id skip = (page - 1) * page_size total = db.rag_interactions.count_documents(query_filter) interactions = db.rag_interactions.find( query_filter, {"_id": 0} ).sort("created_at", DESCENDING).skip(skip).limit(page_size) result_list = [] for interaction in interactions: interaction["rag_start_time"] = interaction["rag_start_time"].isoformat() interaction["rag_end_time"] = interaction["rag_end_time"].isoformat() interaction["created_at"] = interaction["created_at"].isoformat() result_list.append(interaction) return { "total_interactions": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size, "results": result_list } except Exception as e: raise Exception(f"Error retrieving RAG interactions: {str(e)}") def log_image_upload(self, user_id): """Log an image upload for a user""" try: timestamp = datetime.now(timezone.utc) # This is timezone-aware db.image_uploads.insert_one({ "user_id": user_id, "timestamp": timestamp }) return True except Exception as e: raise Exception(f"Error logging image upload: {str(e)}") def get_user_daily_uploads(self, user_id): """Get number of images uploaded by user in the last 24 hours""" try: now = datetime.now(timezone.utc) yesterday = now - timedelta(days=1) count = db.image_uploads.count_documents({ "user_id": user_id, "timestamp": {"$gte": yesterday} }) return count except Exception as e: raise Exception(f"Error retrieving user daily uploads: {str(e)}") def get_user_last_upload_time(self, user_id): """Get the timestamp of user's most recent image upload""" try: last_upload = db.image_uploads.find_one( {"user_id": user_id}, sort=[("timestamp", DESCENDING)] ) return last_upload["timestamp"] if last_upload else None except Exception as e: raise Exception(f"Error retrieving last upload time: {str(e)}")