Spaces:
Sleeping
Sleeping
from app.database.db import db | |
import re | |
from bson import ObjectId | |
from datetime import datetime, timezone, timedelta | |
from pymongo import DESCENDING | |
from typing import Optional | |
class DatabaseQuery: | |
def __init__(self): | |
pass | |
def create_chat_session(self, chat_session): | |
try: | |
db.chat_sessions.insert_one(chat_session) | |
except Exception as e: | |
raise Exception(f"Error creating chat session: {str(e)}") | |
def get_user_chat_sessions(self, user_id): | |
try: | |
sessions = list(db.chat_sessions.find( | |
{"user_id": user_id}, | |
{"_id": 0} | |
).sort("last_accessed", -1)) | |
return sessions | |
except Exception as e: | |
raise Exception(f"Error retrieving user chat sessions: {str(e)}") | |
def create_chat(self, chat_data): | |
try: | |
db.chats.insert_one(chat_data) | |
return True | |
except Exception as e: | |
raise Exception(f"Error creating chat: {str(e)}") | |
def update_last_accessed_time(self, session_id): | |
try: | |
db.chat_sessions.update_one( | |
{"session_id": session_id}, | |
{"$set": {"last_accessed": datetime.now(timezone.utc)}} | |
) | |
except Exception as e: | |
raise Exception(f"Error updating last accessed time: {str(e)}") | |
def get_session_chats(self, session_id, user_id): | |
try: | |
chats = list(db.chats.find( | |
{"session_id": session_id, "user_id": user_id}, | |
{"_id": 0} | |
).sort("timestamp", 1)) | |
return chats | |
except Exception as e: | |
raise Exception(f"Error retrieving session chats: {str(e)}") | |
def get_user_by_identifier(self, identifier): | |
try: | |
user = db.users.find_one({'$or': [{'username': identifier}, {'email': identifier}]}) | |
return user | |
except Exception as e: | |
raise Exception(f"Error retrieving user by identifier: {str(e)}") | |
def add_token_to_blacklist(self, jti): | |
try: | |
db.blacklist.insert_one({'jti': jti}) | |
except Exception as e: | |
raise Exception(f"Error adding token to blacklist: {str(e)}") | |
def create_indexes(self): | |
try: | |
db.chat_sessions.create_index([("user_id", 1), ("last_accessed", -1)]) | |
db.chat_sessions.create_index([("session_id", 1)]) | |
db.chats.create_index([("session_id", 1), ("timestamp", 1)]) | |
db.chats.create_index([("user_id", 1)]) | |
except Exception as e: | |
raise Exception(f"Error creating indexes: {str(e)}") | |
def check_chat_session(self, session_id): | |
try: | |
chat_session = db.chat_sessions.find_one({'session_id': session_id}) | |
return chat_session is not None | |
except Exception as e: | |
raise Exception(f"Error checking chat session: {str(e)}") | |
def get_user_profile(self, username): | |
try: | |
user = db.users.find_one({'username': username}, {'password': 0}) | |
return user | |
except Exception as e: | |
raise Exception(f"Error getting user profile: {str(e)}") | |
def update_user_profile(self, username, update_fields): | |
try: | |
result = db.users.update_one( | |
{'username': username}, | |
{'$set': update_fields} | |
) | |
return result.modified_count > 0 | |
except Exception as e: | |
raise Exception(f"Error updating user profile: {str(e)}") | |
def delete_user_account(self, username): | |
try: | |
result = db.users.delete_one({'username': username}) | |
return result.deleted_count > 0 | |
except Exception as e: | |
raise Exception(f"Error deleting user account: {str(e)}") | |
def is_username_or_email_exists(self, username, email): | |
try: | |
user = db.users.find_one({'$or': [{'username': username}, {'email': email}]}) | |
return user is not None | |
except Exception as e: | |
raise Exception(f"Error checking if username or email exists: {str(e)}") | |
def create_or_update_temp_user(self, username, email, temp_user_data): | |
try: | |
db.temp_users.update_one( | |
{'$or': [{'username': username}, {'email': email}]}, | |
{'$set': temp_user_data}, | |
upsert=True | |
) | |
except Exception as e: | |
raise Exception(f"Error creating/updating temp user: {str(e)}") | |
def get_temp_user_by_username(self, username): | |
try: | |
temp_user = db.temp_users.find_one({'username': username}) | |
return temp_user | |
except Exception as e: | |
raise Exception(f"Error retrieving temp user by username: {str(e)}") | |
def delete_temp_user(self, username): | |
try: | |
db.temp_users.delete_one({'username': username}) | |
except Exception as e: | |
raise Exception(f"Error deleting temp user: {str(e)}") | |
def create_user_from_data(self, user_data): | |
try: | |
db.users.insert_one(user_data) | |
return user_data | |
except Exception as e: | |
raise Exception(f"Error creating user from data: {str(e)}") | |
def create_user(self, username, email, hashed_password, name, age, created_at, | |
is_verified=False, verification_code=None, code_expiration=None): | |
try: | |
new_user = { | |
'username': username, | |
'email': email, | |
'password': hashed_password, | |
'name': name, | |
'age': age, | |
'created_at': created_at, | |
'is_verified': is_verified | |
} | |
if verification_code and code_expiration: | |
new_user['verification_code'] = verification_code | |
new_user['code_expiration'] = code_expiration | |
db.users.insert_one(new_user) | |
return new_user | |
except Exception as e: | |
raise Exception(f"Error creating user: {str(e)}") | |
def get_user_by_username(self, username): | |
try: | |
user = db.users.find_one({'username': username}) | |
return user | |
except Exception as e: | |
raise Exception(f"Error retrieving user by username: {str(e)}") | |
def verify_user_email(self, username): | |
try: | |
result = db.users.update_one( | |
{'username': username}, | |
{'$set': {'is_verified': True}, '$unset': {'verification_code': '', 'code_expiration': ''}} | |
) | |
return result.modified_count > 0 | |
except Exception as e: | |
raise Exception(f"Error verifying user email: {str(e)}") | |
def update_verification_code(self, username, verification_code, code_expiration): | |
try: | |
result = db.users.update_one( | |
{'username': username}, | |
{'$set': {'verification_code': verification_code, 'code_expiration': code_expiration}} | |
) | |
return result.modified_count > 0 | |
except Exception as e: | |
raise Exception(f"Error updating verification code: {str(e)}") | |
def is_valid_email(self, email): | |
try: | |
email_regex = r'^\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,4}\b' | |
return re.match(email_regex, email) is not None | |
except Exception as e: | |
raise Exception(f"Error validating email: {str(e)}") | |
def add_or_update_location(self, username, location): | |
try: | |
db.locations.update_one( | |
{'username': username}, | |
{'$set': {'location': location, 'updated_at': datetime.now(timezone.utc)}}, | |
upsert=True | |
) | |
except Exception as e: | |
raise Exception(f"Error adding/updating location: {str(e)}") | |
def get_location(self, username): | |
try: | |
location = db.locations.find_one({'username': username}) | |
return location | |
except Exception as e: | |
raise Exception(f"Error retrieving location: {str(e)}") | |
def submit_questionnaire(self, user_id, answers): | |
try: | |
questionnaire_data = { | |
'user_id': user_id, | |
'answers': answers, | |
'created_at': datetime.now(timezone.utc), | |
'updated_at': datetime.now(timezone.utc) | |
} | |
result = db.questionnaires.insert_one(questionnaire_data) | |
return str(result.inserted_id) | |
except Exception as e: | |
raise Exception(f"Error submitting questionnaire: {str(e)}") | |
def get_latest_questionnaire(self, user_id): | |
try: | |
questionnaire = db.questionnaires.find_one( | |
{'user_id': user_id}, | |
sort=[('created_at', -1)] | |
) | |
if questionnaire: | |
questionnaire['_id'] = str(questionnaire['_id']) | |
return questionnaire | |
except Exception as e: | |
raise Exception(f"Error getting latest questionnaire: {str(e)}") | |
def update_questionnaire(self, questionnaire_id, user_id, answers): | |
try: | |
result = db.questionnaires.update_one( | |
{'_id': ObjectId(questionnaire_id), 'user_id': user_id}, | |
{ | |
'$set': { | |
'answers': answers, | |
'updated_at': datetime.now(timezone.utc) | |
} | |
} | |
) | |
return result.modified_count > 0 | |
except Exception as e: | |
raise Exception(f"Error updating questionnaire: {str(e)}") | |
def delete_questionnaire(self, questionnaire_id, user_id): | |
try: | |
result = db.questionnaires.delete_one( | |
{'_id': ObjectId(questionnaire_id), 'user_id': user_id} | |
) | |
return result.deleted_count > 0 | |
except Exception as e: | |
raise Exception(f"Error deleting questionnaire: {str(e)}") | |
def count_answered_questions(self, username): | |
try: | |
answered_count = db.questions.count_documents({ | |
'username': username, | |
'answer': {'$ne': None} | |
}) | |
return answered_count | |
except Exception as e: | |
raise Exception(f"Error counting answered questions: {str(e)}") | |
def get_user_preferences(self, username): | |
try: | |
user_preferences = db.preferences.find_one({'username': username}) | |
if not user_preferences: | |
return { | |
'keywords': False, | |
'references': False, | |
'websearch': False, | |
'personalized_recommendations': False, | |
'environmental_recommendations': False | |
} | |
return { | |
'keywords': user_preferences.get('keywords', False), | |
'references': user_preferences.get('references', False), | |
'websearch': user_preferences.get('websearch', False), | |
'personalized_recommendations': user_preferences.get('personalized_recommendations', False), | |
'environmental_recommendations': user_preferences.get('environmental_recommendations', False) | |
} | |
except Exception as e: | |
raise Exception(f"Error getting user preferences: {str(e)}") | |
def set_user_preferences(self, username, preferences): | |
try: | |
preferences_data = { | |
'username': username, | |
'keywords': bool(preferences.get('keywords', False)), | |
'references': bool(preferences.get('references', False)), | |
'websearch': bool(preferences.get('websearch', False)), | |
'personalized_recommendations': bool(preferences.get('personalized_recommendations', False)), | |
'environmental_recommendations': bool(preferences.get('environmental_recommendations', False)), | |
'updated_at': datetime.now(timezone.utc) | |
} | |
result = db.preferences.update_one( | |
{'username': username}, | |
{'$set': preferences_data}, | |
upsert=True | |
) | |
return preferences_data | |
except Exception as e: | |
raise Exception(f"Error setting user preferences: {str(e)}") | |
def get_user_theme(self, username): | |
try: | |
user_theme = db.user_themes.find_one({'username': username}) | |
if not user_theme: | |
return 'light' | |
return user_theme.get('theme', 'light') | |
except Exception as e: | |
raise Exception(f"Error getting user theme: {str(e)}") | |
def set_user_theme(self, username, theme): | |
try: | |
theme_data = { | |
'username': username, | |
'theme': "dark" if theme else "light", | |
'updated_at': datetime.now(timezone.utc) | |
} | |
db.user_themes.update_one( | |
{'username': username}, | |
{'$set': theme_data}, | |
upsert=True | |
) | |
return theme_data | |
except Exception as e: | |
raise Exception(f"Error setting user theme: {str(e)}") | |
def verify_session(self, session_id, user_id): | |
try: | |
session = db.chat_sessions.find_one({ | |
"session_id": session_id, | |
"user_id": user_id | |
}) | |
return session is not None | |
except Exception as e: | |
raise Exception(f"Error verifying session: {str(e)}") | |
def update_chat_session_title(self, session_id, new_title): | |
try: | |
result = db.chat_sessions.update_one( | |
{"session_id": session_id}, | |
{"$set": {"title": new_title}} | |
) | |
if result.matched_count == 0: | |
raise Exception("Chat session not found") | |
return result.modified_count > 0 | |
except Exception as e: | |
raise Exception(f"Error updating chat session title: {str(e)}") | |
def delete_chat_session(self, session_id, user_id): | |
try: | |
session_result = db.chat_sessions.delete_one({ | |
"session_id": session_id, | |
"user_id": user_id | |
}) | |
chats_result = db.chats.delete_many({ | |
"session_id": session_id, | |
"user_id": user_id | |
}) | |
return { | |
"session_deleted": session_result.deleted_count > 0, | |
"chats_deleted": chats_result.deleted_count | |
} | |
except Exception as e: | |
raise Exception(f"Error deleting chat session and chats: {str(e)}") | |
def delete_all_user_sessions_and_chats(self, user_id): | |
try: | |
chats_result = db.chats.delete_many({"user_id": user_id}) | |
sessions_result = db.chat_sessions.delete_many({"user_id": user_id}) | |
return { | |
"deleted_chats": chats_result.deleted_count, | |
"deleted_sessions": sessions_result.deleted_count | |
} | |
except Exception as e: | |
raise Exception(f"Error deleting user sessions and chats: {str(e)}") | |
def get_all_user_chats(self, user_id): | |
try: | |
sessions = list(db.chat_sessions.find( | |
{"user_id": user_id}, | |
{"_id": 0} | |
).sort("last_accessed", -1)) | |
all_chats = [] | |
for session in sessions: | |
session_chats = list(db.chats.find( | |
{"session_id": session["session_id"], "user_id": user_id}, | |
{"_id": 0} | |
).sort("timestamp", 1)) | |
all_chats.append({ | |
"session_id": session["session_id"], | |
"title": session.get("title", "New Chat"), | |
"created_at": session.get("created_at"), | |
"last_accessed": session.get("last_accessed"), | |
"chats": session_chats | |
}) | |
return all_chats | |
except Exception as e: | |
raise Exception(f"Error retrieving all user chats: {str(e)}") | |
def store_reset_token(self, email, token, expiration): | |
try: | |
db.password_resets.update_one( | |
{'email': email}, | |
{ | |
'$set': { | |
'token': token, | |
'expiration': expiration | |
} | |
}, | |
upsert=True | |
) | |
except Exception as e: | |
raise Exception(f"Error storing reset token: {str(e)}") | |
def verify_reset_token(self, token): | |
try: | |
reset_info = db.password_resets.find_one({ | |
'token': token, | |
'expiration': {'$gt': datetime.now(timezone.utc)} | |
}) | |
return reset_info | |
except Exception as e: | |
raise Exception(f"Error verifying reset token: {str(e)}") | |
def update_password(self, email, hashed_password): | |
try: | |
db.users.update_one( | |
{'email': email}, | |
{'$set': {'password': hashed_password}} | |
) | |
except Exception as e: | |
raise Exception(f"Error updating password: {str(e)}") | |
def delete_reset_token(self, token): | |
try: | |
db.password_resets.delete_one({'token': token}) | |
except Exception as e: | |
raise Exception(f"Error deleting reset token: {str(e)}") | |
def delete_account_permanently(self, username): | |
try: | |
chat_deletion_result = self.delete_all_user_sessions_and_chats(username) | |
preferences_result = db.preferences.delete_one({'username': username}) | |
theme_result = db.user_themes.delete_one({'username': username}) | |
location_result = db.locations.delete_one({'username': username}) | |
questionnaire_result = db.questionnaires.delete_many({'user_id': username}) | |
user_result = db.users.delete_one({'username': username}) | |
return { | |
'success': True, | |
'deleted_data': { | |
'chats': chat_deletion_result['deleted_chats'], | |
'chat_sessions': chat_deletion_result['deleted_sessions'], | |
'preferences': preferences_result.deleted_count, | |
'theme': theme_result.deleted_count, | |
'location': location_result.deleted_count, | |
'questionnaires': questionnaire_result.deleted_count, | |
'user_account': user_result.deleted_count | |
} | |
} | |
except Exception as e: | |
raise Exception(f"Error deleting account permanently: {str(e)}") | |
def store_reset_token(self, email, token, expiration): | |
try: | |
db.password_resets.update_one( | |
{'email': email}, | |
{ | |
'$set': { | |
'token': token, | |
'expiration': expiration | |
} | |
}, | |
upsert=True | |
) | |
except Exception as e: | |
raise Exception(f"Error storing reset token: {str(e)}") | |
def verify_reset_token(self, token): | |
try: | |
reset_info = db.password_resets.find_one({ | |
'token': token, | |
'expiration': {'$gt': datetime.now(timezone.utc)} | |
}) | |
return reset_info | |
except Exception as e: | |
raise Exception(f"Error verifying reset token: {str(e)}") | |
def update_password(self, email, new_password): | |
try: | |
db.users.update_one( | |
{'email': email}, | |
{'$set': {'password': new_password}} | |
) | |
except Exception as e: | |
raise Exception(f"Error updating password: {str(e)}") | |
def get_user_language(self, user_id): | |
try: | |
language = db.languages.find_one({'user_id': user_id}) | |
return language.get('language') if language else None | |
except Exception as e: | |
raise Exception(f"Error retrieving user language: {str(e)}") | |
def set_user_language(self, user_id, language): | |
try: | |
language_data = { | |
'user_id': user_id, | |
'language': language, | |
'updated_at': datetime.now(timezone.utc) | |
} | |
result = db.languages.update_one( | |
{'user_id': user_id}, | |
{'$set': language_data}, | |
upsert=True | |
) | |
return language_data | |
except Exception as e: | |
raise Exception(f"Error setting user language: {str(e)}") | |
def delete_user_language(self, user_id): | |
try: | |
result = db.languages.delete_one({'user_id': user_id}) | |
return result.deleted_count > 0 | |
except Exception as e: | |
raise Exception(f"Error deleting user language: {str(e)}") | |
def get_today_schedule(self, user_id): | |
try: | |
# Get today's date at midnight UTC | |
today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) | |
tomorrow = today.replace(hour=23, minute=59, second=59) | |
schedule = db.skin_schedules.find_one({ | |
"user_id": user_id, | |
"created_at": { | |
"$gte": today, | |
"$lte": tomorrow | |
} | |
}) | |
return schedule | |
except Exception as e: | |
raise Exception(f"Error retrieving today's schedule: {str(e)}") | |
def save_schedule(self, user_id, schedule_data): | |
try: | |
existing_schedule = self.get_today_schedule(user_id) | |
if existing_schedule: | |
return str(existing_schedule["_id"]) | |
schedule = { | |
"user_id": user_id, | |
"schedule_data": schedule_data, | |
"created_at": datetime.now(timezone.utc) | |
} | |
result = db.skin_schedules.insert_one(schedule) | |
return str(result.inserted_id) | |
except Exception as e: | |
raise Exception(f"Error saving schedule: {str(e)}") | |
def get_last_seven_days_schedules(self, user_id): | |
try: | |
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7) | |
schedules = db.skin_schedules.find({ | |
"user_id": user_id, | |
"created_at": {"$gte": seven_days_ago} | |
}).sort("created_at", -1) | |
return list(schedules) | |
except Exception as e: | |
raise Exception(f"Error fetching last 7 days schedules: {str(e)}") | |
def save_rag_interaction(self, user_id: str, session_id: str, context: str, query: str, | |
response: str, rag_start_time: datetime, rag_end_time: datetime): | |
try: | |
interaction = { | |
"interaction_id": str(ObjectId()), | |
"user_id": user_id, | |
"session_id": session_id, | |
"context": context, | |
"query": query, | |
"response": response, | |
"rag_start_time": rag_start_time.astimezone(timezone.utc), | |
"rag_end_time": rag_end_time.astimezone(timezone.utc), | |
"created_at": datetime.now(timezone.utc) | |
} | |
result = db.rag_interactions.insert_one(interaction) | |
return interaction["interaction_id"] | |
except Exception as e: | |
raise Exception(f"Error saving RAG interaction: {str(e)}") | |
def get_rag_interactions( | |
self, | |
user_id: Optional[str] = None, | |
page: int = 1, | |
page_size: int = 5 | |
) -> dict: | |
try: | |
query_filter = {} | |
if user_id: | |
query_filter["user_id"] = user_id | |
skip = (page - 1) * page_size | |
total = db.rag_interactions.count_documents(query_filter) | |
interactions = db.rag_interactions.find( | |
query_filter, | |
{"_id": 0} | |
).sort("created_at", DESCENDING).skip(skip).limit(page_size) | |
result_list = [] | |
for interaction in interactions: | |
interaction["rag_start_time"] = interaction["rag_start_time"].isoformat() | |
interaction["rag_end_time"] = interaction["rag_end_time"].isoformat() | |
interaction["created_at"] = interaction["created_at"].isoformat() | |
result_list.append(interaction) | |
return { | |
"total_interactions": total, | |
"page": page, | |
"page_size": page_size, | |
"total_pages": (total + page_size - 1) // page_size, | |
"results": result_list | |
} | |
except Exception as e: | |
raise Exception(f"Error retrieving RAG interactions: {str(e)}") | |
def log_image_upload(self, user_id): | |
"""Log an image upload for a user""" | |
try: | |
timestamp = datetime.now(timezone.utc) # This is timezone-aware | |
db.image_uploads.insert_one({ | |
"user_id": user_id, | |
"timestamp": timestamp | |
}) | |
return True | |
except Exception as e: | |
raise Exception(f"Error logging image upload: {str(e)}") | |
def get_user_daily_uploads(self, user_id): | |
"""Get number of images uploaded by user in the last 24 hours""" | |
try: | |
now = datetime.now(timezone.utc) | |
yesterday = now - timedelta(days=1) | |
count = db.image_uploads.count_documents({ | |
"user_id": user_id, | |
"timestamp": {"$gte": yesterday} | |
}) | |
return count | |
except Exception as e: | |
raise Exception(f"Error retrieving user daily uploads: {str(e)}") | |
def get_user_last_upload_time(self, user_id): | |
"""Get the timestamp of user's most recent image upload""" | |
try: | |
last_upload = db.image_uploads.find_one( | |
{"user_id": user_id}, | |
sort=[("timestamp", DESCENDING)] | |
) | |
return last_upload["timestamp"] if last_upload else None | |
except Exception as e: | |
raise Exception(f"Error retrieving last upload time: {str(e)}") |