derm-ai / app /database /database_query.py
muhammadnoman76's picture
update
75e2b6c
from app.database.db import db
import re
from bson import ObjectId
from datetime import datetime, timezone, timedelta
from pymongo import DESCENDING
from typing import Optional
class DatabaseQuery:
def __init__(self):
pass
def create_chat_session(self, chat_session):
try:
db.chat_sessions.insert_one(chat_session)
except Exception as e:
raise Exception(f"Error creating chat session: {str(e)}")
def get_user_chat_sessions(self, user_id):
try:
sessions = list(db.chat_sessions.find(
{"user_id": user_id},
{"_id": 0}
).sort("last_accessed", -1))
return sessions
except Exception as e:
raise Exception(f"Error retrieving user chat sessions: {str(e)}")
def create_chat(self, chat_data):
try:
db.chats.insert_one(chat_data)
return True
except Exception as e:
raise Exception(f"Error creating chat: {str(e)}")
def update_last_accessed_time(self, session_id):
try:
db.chat_sessions.update_one(
{"session_id": session_id},
{"$set": {"last_accessed": datetime.now(timezone.utc)}}
)
except Exception as e:
raise Exception(f"Error updating last accessed time: {str(e)}")
def get_session_chats(self, session_id, user_id):
try:
chats = list(db.chats.find(
{"session_id": session_id, "user_id": user_id},
{"_id": 0}
).sort("timestamp", 1))
return chats
except Exception as e:
raise Exception(f"Error retrieving session chats: {str(e)}")
def get_user_by_identifier(self, identifier):
try:
user = db.users.find_one({'$or': [{'username': identifier}, {'email': identifier}]})
return user
except Exception as e:
raise Exception(f"Error retrieving user by identifier: {str(e)}")
def add_token_to_blacklist(self, jti):
try:
db.blacklist.insert_one({'jti': jti})
except Exception as e:
raise Exception(f"Error adding token to blacklist: {str(e)}")
def create_indexes(self):
try:
db.chat_sessions.create_index([("user_id", 1), ("last_accessed", -1)])
db.chat_sessions.create_index([("session_id", 1)])
db.chats.create_index([("session_id", 1), ("timestamp", 1)])
db.chats.create_index([("user_id", 1)])
except Exception as e:
raise Exception(f"Error creating indexes: {str(e)}")
def check_chat_session(self, session_id):
try:
chat_session = db.chat_sessions.find_one({'session_id': session_id})
return chat_session is not None
except Exception as e:
raise Exception(f"Error checking chat session: {str(e)}")
def get_user_profile(self, username):
try:
user = db.users.find_one({'username': username}, {'password': 0})
return user
except Exception as e:
raise Exception(f"Error getting user profile: {str(e)}")
def update_user_profile(self, username, update_fields):
try:
result = db.users.update_one(
{'username': username},
{'$set': update_fields}
)
return result.modified_count > 0
except Exception as e:
raise Exception(f"Error updating user profile: {str(e)}")
def delete_user_account(self, username):
try:
result = db.users.delete_one({'username': username})
return result.deleted_count > 0
except Exception as e:
raise Exception(f"Error deleting user account: {str(e)}")
def is_username_or_email_exists(self, username, email):
try:
user = db.users.find_one({'$or': [{'username': username}, {'email': email}]})
return user is not None
except Exception as e:
raise Exception(f"Error checking if username or email exists: {str(e)}")
def create_or_update_temp_user(self, username, email, temp_user_data):
try:
db.temp_users.update_one(
{'$or': [{'username': username}, {'email': email}]},
{'$set': temp_user_data},
upsert=True
)
except Exception as e:
raise Exception(f"Error creating/updating temp user: {str(e)}")
def get_temp_user_by_username(self, username):
try:
temp_user = db.temp_users.find_one({'username': username})
return temp_user
except Exception as e:
raise Exception(f"Error retrieving temp user by username: {str(e)}")
def delete_temp_user(self, username):
try:
db.temp_users.delete_one({'username': username})
except Exception as e:
raise Exception(f"Error deleting temp user: {str(e)}")
def create_user_from_data(self, user_data):
try:
db.users.insert_one(user_data)
return user_data
except Exception as e:
raise Exception(f"Error creating user from data: {str(e)}")
def create_user(self, username, email, hashed_password, name, age, created_at,
is_verified=False, verification_code=None, code_expiration=None):
try:
new_user = {
'username': username,
'email': email,
'password': hashed_password,
'name': name,
'age': age,
'created_at': created_at,
'is_verified': is_verified
}
if verification_code and code_expiration:
new_user['verification_code'] = verification_code
new_user['code_expiration'] = code_expiration
db.users.insert_one(new_user)
return new_user
except Exception as e:
raise Exception(f"Error creating user: {str(e)}")
def get_user_by_username(self, username):
try:
user = db.users.find_one({'username': username})
return user
except Exception as e:
raise Exception(f"Error retrieving user by username: {str(e)}")
def verify_user_email(self, username):
try:
result = db.users.update_one(
{'username': username},
{'$set': {'is_verified': True}, '$unset': {'verification_code': '', 'code_expiration': ''}}
)
return result.modified_count > 0
except Exception as e:
raise Exception(f"Error verifying user email: {str(e)}")
def update_verification_code(self, username, verification_code, code_expiration):
try:
result = db.users.update_one(
{'username': username},
{'$set': {'verification_code': verification_code, 'code_expiration': code_expiration}}
)
return result.modified_count > 0
except Exception as e:
raise Exception(f"Error updating verification code: {str(e)}")
def is_valid_email(self, email):
try:
email_regex = r'^\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,4}\b'
return re.match(email_regex, email) is not None
except Exception as e:
raise Exception(f"Error validating email: {str(e)}")
def add_or_update_location(self, username, location):
try:
db.locations.update_one(
{'username': username},
{'$set': {'location': location, 'updated_at': datetime.now(timezone.utc)}},
upsert=True
)
except Exception as e:
raise Exception(f"Error adding/updating location: {str(e)}")
def get_location(self, username):
try:
location = db.locations.find_one({'username': username})
return location
except Exception as e:
raise Exception(f"Error retrieving location: {str(e)}")
def submit_questionnaire(self, user_id, answers):
try:
questionnaire_data = {
'user_id': user_id,
'answers': answers,
'created_at': datetime.now(timezone.utc),
'updated_at': datetime.now(timezone.utc)
}
result = db.questionnaires.insert_one(questionnaire_data)
return str(result.inserted_id)
except Exception as e:
raise Exception(f"Error submitting questionnaire: {str(e)}")
def get_latest_questionnaire(self, user_id):
try:
questionnaire = db.questionnaires.find_one(
{'user_id': user_id},
sort=[('created_at', -1)]
)
if questionnaire:
questionnaire['_id'] = str(questionnaire['_id'])
return questionnaire
except Exception as e:
raise Exception(f"Error getting latest questionnaire: {str(e)}")
def update_questionnaire(self, questionnaire_id, user_id, answers):
try:
result = db.questionnaires.update_one(
{'_id': ObjectId(questionnaire_id), 'user_id': user_id},
{
'$set': {
'answers': answers,
'updated_at': datetime.now(timezone.utc)
}
}
)
return result.modified_count > 0
except Exception as e:
raise Exception(f"Error updating questionnaire: {str(e)}")
def delete_questionnaire(self, questionnaire_id, user_id):
try:
result = db.questionnaires.delete_one(
{'_id': ObjectId(questionnaire_id), 'user_id': user_id}
)
return result.deleted_count > 0
except Exception as e:
raise Exception(f"Error deleting questionnaire: {str(e)}")
def count_answered_questions(self, username):
try:
answered_count = db.questions.count_documents({
'username': username,
'answer': {'$ne': None}
})
return answered_count
except Exception as e:
raise Exception(f"Error counting answered questions: {str(e)}")
def get_user_preferences(self, username):
try:
user_preferences = db.preferences.find_one({'username': username})
if not user_preferences:
return {
'keywords': False,
'references': False,
'websearch': False,
'personalized_recommendations': False,
'environmental_recommendations': False
}
return {
'keywords': user_preferences.get('keywords', False),
'references': user_preferences.get('references', False),
'websearch': user_preferences.get('websearch', False),
'personalized_recommendations': user_preferences.get('personalized_recommendations', False),
'environmental_recommendations': user_preferences.get('environmental_recommendations', False)
}
except Exception as e:
raise Exception(f"Error getting user preferences: {str(e)}")
def set_user_preferences(self, username, preferences):
try:
preferences_data = {
'username': username,
'keywords': bool(preferences.get('keywords', False)),
'references': bool(preferences.get('references', False)),
'websearch': bool(preferences.get('websearch', False)),
'personalized_recommendations': bool(preferences.get('personalized_recommendations', False)),
'environmental_recommendations': bool(preferences.get('environmental_recommendations', False)),
'updated_at': datetime.now(timezone.utc)
}
result = db.preferences.update_one(
{'username': username},
{'$set': preferences_data},
upsert=True
)
return preferences_data
except Exception as e:
raise Exception(f"Error setting user preferences: {str(e)}")
def get_user_theme(self, username):
try:
user_theme = db.user_themes.find_one({'username': username})
if not user_theme:
return 'light'
return user_theme.get('theme', 'light')
except Exception as e:
raise Exception(f"Error getting user theme: {str(e)}")
def set_user_theme(self, username, theme):
try:
theme_data = {
'username': username,
'theme': "dark" if theme else "light",
'updated_at': datetime.now(timezone.utc)
}
db.user_themes.update_one(
{'username': username},
{'$set': theme_data},
upsert=True
)
return theme_data
except Exception as e:
raise Exception(f"Error setting user theme: {str(e)}")
def verify_session(self, session_id, user_id):
try:
session = db.chat_sessions.find_one({
"session_id": session_id,
"user_id": user_id
})
return session is not None
except Exception as e:
raise Exception(f"Error verifying session: {str(e)}")
def update_chat_session_title(self, session_id, new_title):
try:
result = db.chat_sessions.update_one(
{"session_id": session_id},
{"$set": {"title": new_title}}
)
if result.matched_count == 0:
raise Exception("Chat session not found")
return result.modified_count > 0
except Exception as e:
raise Exception(f"Error updating chat session title: {str(e)}")
def delete_chat_session(self, session_id, user_id):
try:
session_result = db.chat_sessions.delete_one({
"session_id": session_id,
"user_id": user_id
})
chats_result = db.chats.delete_many({
"session_id": session_id,
"user_id": user_id
})
return {
"session_deleted": session_result.deleted_count > 0,
"chats_deleted": chats_result.deleted_count
}
except Exception as e:
raise Exception(f"Error deleting chat session and chats: {str(e)}")
def delete_all_user_sessions_and_chats(self, user_id):
try:
chats_result = db.chats.delete_many({"user_id": user_id})
sessions_result = db.chat_sessions.delete_many({"user_id": user_id})
return {
"deleted_chats": chats_result.deleted_count,
"deleted_sessions": sessions_result.deleted_count
}
except Exception as e:
raise Exception(f"Error deleting user sessions and chats: {str(e)}")
def get_all_user_chats(self, user_id):
try:
sessions = list(db.chat_sessions.find(
{"user_id": user_id},
{"_id": 0}
).sort("last_accessed", -1))
all_chats = []
for session in sessions:
session_chats = list(db.chats.find(
{"session_id": session["session_id"], "user_id": user_id},
{"_id": 0}
).sort("timestamp", 1))
all_chats.append({
"session_id": session["session_id"],
"title": session.get("title", "New Chat"),
"created_at": session.get("created_at"),
"last_accessed": session.get("last_accessed"),
"chats": session_chats
})
return all_chats
except Exception as e:
raise Exception(f"Error retrieving all user chats: {str(e)}")
def store_reset_token(self, email, token, expiration):
try:
db.password_resets.update_one(
{'email': email},
{
'$set': {
'token': token,
'expiration': expiration
}
},
upsert=True
)
except Exception as e:
raise Exception(f"Error storing reset token: {str(e)}")
def verify_reset_token(self, token):
try:
reset_info = db.password_resets.find_one({
'token': token,
'expiration': {'$gt': datetime.now(timezone.utc)}
})
return reset_info
except Exception as e:
raise Exception(f"Error verifying reset token: {str(e)}")
def update_password(self, email, hashed_password):
try:
db.users.update_one(
{'email': email},
{'$set': {'password': hashed_password}}
)
except Exception as e:
raise Exception(f"Error updating password: {str(e)}")
def delete_reset_token(self, token):
try:
db.password_resets.delete_one({'token': token})
except Exception as e:
raise Exception(f"Error deleting reset token: {str(e)}")
def delete_account_permanently(self, username):
try:
chat_deletion_result = self.delete_all_user_sessions_and_chats(username)
preferences_result = db.preferences.delete_one({'username': username})
theme_result = db.user_themes.delete_one({'username': username})
location_result = db.locations.delete_one({'username': username})
questionnaire_result = db.questionnaires.delete_many({'user_id': username})
user_result = db.users.delete_one({'username': username})
return {
'success': True,
'deleted_data': {
'chats': chat_deletion_result['deleted_chats'],
'chat_sessions': chat_deletion_result['deleted_sessions'],
'preferences': preferences_result.deleted_count,
'theme': theme_result.deleted_count,
'location': location_result.deleted_count,
'questionnaires': questionnaire_result.deleted_count,
'user_account': user_result.deleted_count
}
}
except Exception as e:
raise Exception(f"Error deleting account permanently: {str(e)}")
def store_reset_token(self, email, token, expiration):
try:
db.password_resets.update_one(
{'email': email},
{
'$set': {
'token': token,
'expiration': expiration
}
},
upsert=True
)
except Exception as e:
raise Exception(f"Error storing reset token: {str(e)}")
def verify_reset_token(self, token):
try:
reset_info = db.password_resets.find_one({
'token': token,
'expiration': {'$gt': datetime.now(timezone.utc)}
})
return reset_info
except Exception as e:
raise Exception(f"Error verifying reset token: {str(e)}")
def update_password(self, email, new_password):
try:
db.users.update_one(
{'email': email},
{'$set': {'password': new_password}}
)
except Exception as e:
raise Exception(f"Error updating password: {str(e)}")
def get_user_language(self, user_id):
try:
language = db.languages.find_one({'user_id': user_id})
return language.get('language') if language else None
except Exception as e:
raise Exception(f"Error retrieving user language: {str(e)}")
def set_user_language(self, user_id, language):
try:
language_data = {
'user_id': user_id,
'language': language,
'updated_at': datetime.now(timezone.utc)
}
result = db.languages.update_one(
{'user_id': user_id},
{'$set': language_data},
upsert=True
)
return language_data
except Exception as e:
raise Exception(f"Error setting user language: {str(e)}")
def delete_user_language(self, user_id):
try:
result = db.languages.delete_one({'user_id': user_id})
return result.deleted_count > 0
except Exception as e:
raise Exception(f"Error deleting user language: {str(e)}")
def get_today_schedule(self, user_id):
try:
# Get today's date at midnight UTC
today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
tomorrow = today.replace(hour=23, minute=59, second=59)
schedule = db.skin_schedules.find_one({
"user_id": user_id,
"created_at": {
"$gte": today,
"$lte": tomorrow
}
})
return schedule
except Exception as e:
raise Exception(f"Error retrieving today's schedule: {str(e)}")
def save_schedule(self, user_id, schedule_data):
try:
existing_schedule = self.get_today_schedule(user_id)
if existing_schedule:
return str(existing_schedule["_id"])
schedule = {
"user_id": user_id,
"schedule_data": schedule_data,
"created_at": datetime.now(timezone.utc)
}
result = db.skin_schedules.insert_one(schedule)
return str(result.inserted_id)
except Exception as e:
raise Exception(f"Error saving schedule: {str(e)}")
def get_last_seven_days_schedules(self, user_id):
try:
seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7)
schedules = db.skin_schedules.find({
"user_id": user_id,
"created_at": {"$gte": seven_days_ago}
}).sort("created_at", -1)
return list(schedules)
except Exception as e:
raise Exception(f"Error fetching last 7 days schedules: {str(e)}")
def save_rag_interaction(self, user_id: str, session_id: str, context: str, query: str,
response: str, rag_start_time: datetime, rag_end_time: datetime):
try:
interaction = {
"interaction_id": str(ObjectId()),
"user_id": user_id,
"session_id": session_id,
"context": context,
"query": query,
"response": response,
"rag_start_time": rag_start_time.astimezone(timezone.utc),
"rag_end_time": rag_end_time.astimezone(timezone.utc),
"created_at": datetime.now(timezone.utc)
}
result = db.rag_interactions.insert_one(interaction)
return interaction["interaction_id"]
except Exception as e:
raise Exception(f"Error saving RAG interaction: {str(e)}")
def get_rag_interactions(
self,
user_id: Optional[str] = None,
page: int = 1,
page_size: int = 5
) -> dict:
try:
query_filter = {}
if user_id:
query_filter["user_id"] = user_id
skip = (page - 1) * page_size
total = db.rag_interactions.count_documents(query_filter)
interactions = db.rag_interactions.find(
query_filter,
{"_id": 0}
).sort("created_at", DESCENDING).skip(skip).limit(page_size)
result_list = []
for interaction in interactions:
interaction["rag_start_time"] = interaction["rag_start_time"].isoformat()
interaction["rag_end_time"] = interaction["rag_end_time"].isoformat()
interaction["created_at"] = interaction["created_at"].isoformat()
result_list.append(interaction)
return {
"total_interactions": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size,
"results": result_list
}
except Exception as e:
raise Exception(f"Error retrieving RAG interactions: {str(e)}")
def log_image_upload(self, user_id):
"""Log an image upload for a user"""
try:
timestamp = datetime.now(timezone.utc) # This is timezone-aware
db.image_uploads.insert_one({
"user_id": user_id,
"timestamp": timestamp
})
return True
except Exception as e:
raise Exception(f"Error logging image upload: {str(e)}")
def get_user_daily_uploads(self, user_id):
"""Get number of images uploaded by user in the last 24 hours"""
try:
now = datetime.now(timezone.utc)
yesterday = now - timedelta(days=1)
count = db.image_uploads.count_documents({
"user_id": user_id,
"timestamp": {"$gte": yesterday}
})
return count
except Exception as e:
raise Exception(f"Error retrieving user daily uploads: {str(e)}")
def get_user_last_upload_time(self, user_id):
"""Get the timestamp of user's most recent image upload"""
try:
last_upload = db.image_uploads.find_one(
{"user_id": user_id},
sort=[("timestamp", DESCENDING)]
)
return last_upload["timestamp"] if last_upload else None
except Exception as e:
raise Exception(f"Error retrieving last upload time: {str(e)}")