Spaces:
Running
Running
from app.database.database_query import DatabaseQuery | |
import os | |
import jwt | |
from dotenv import load_dotenv | |
from typing import Optional, Dict, List | |
from bson import ObjectId | |
from datetime import datetime | |
load_dotenv() | |
jwt_secret_key = os.getenv('JWT_SECRET_KEY') | |
query = DatabaseQuery() | |
class ChatSession: | |
def __init__(self, token: str , session_id: str): | |
self.token = token | |
self.session_id = session_id | |
self.chats = [] | |
self.identity = self._decode_token(token) | |
self.query = query | |
def _decode_token(self, token: str) -> str: | |
try: | |
decoded_token = jwt.decode(token, jwt_secret_key, algorithms=["HS256"]) | |
identity = decoded_token['sub'] | |
return identity | |
except jwt.ExpiredSignatureError: | |
raise ValueError("The token has expired.") | |
except jwt.InvalidTokenError: | |
raise ValueError("Invalid token.") | |
except Exception as e: | |
raise ValueError(f"Failed to decode token: {e}") | |
def get_user_preferences(self) -> dict: | |
current_user = self.identity | |
preferences = self.query.get_user_preferences(current_user) | |
if preferences is not None: | |
return preferences | |
raise ValueError("Failed to fetch user preferences.") | |
def get_personalized_recommendation(self) -> Optional[str]: | |
current_user = self.identity | |
response = self.query.get_latest_questionnaire(current_user) | |
if not response: | |
return None | |
answers = response.get('answers', {}) | |
if not answers: | |
return None | |
def format_answer(answer): | |
if answer is None: | |
return None | |
if isinstance(answer, str): | |
stripped_answer = answer.strip().lower() | |
if stripped_answer in ['none', ''] or len(stripped_answer) < 3: | |
return None | |
return stripped_answer | |
if isinstance(answer, list): | |
filtered_answer = [item for item in answer if "Other" not in item | |
and item.strip().lower() not in ['none', ''] | |
and len(item.strip()) >= 3] | |
return ", ".join(filtered_answer) if filtered_answer else None | |
return answer | |
questions = { | |
"skinType": "How would you describe your skin type?", | |
"currentConditions": "Do you currently have any skin conditions?", | |
"autoImmuneConditions": "Do you have a history of autoimmune or hormonal conditions?", | |
"allergies": "Do you have any known allergies to skincare ingredients?", | |
"medications": "Are you currently taking any medications that might affect your skin?", | |
"hormonal": "Do you experience hormonal changes that affect your skin?", | |
"diet": "Have you noticed any foods that trigger skin reactions?", | |
"diabetes": "Do you have diabetes?", | |
"outdoorTime": "How much time do you spend outdoors during the day?", | |
"sleep": "How many hours of sleep do you get on average?", | |
"familyHistory": "Do you have a family history of skin conditions?", | |
"products": "What skincare products are you currently using?" | |
} | |
valid_answers = {key: format_answer(answers.get(key)) | |
for key in questions | |
if format_answer(answers.get(key)) is not None} | |
if not valid_answers: | |
return None | |
formatted_response = [] | |
for key, answer in valid_answers.items(): | |
question = questions.get(key) | |
formatted_response.append(f"question: {question}\nUser answer: {answer}") | |
profile = self.get_profile() | |
name = profile.get('name', 'Unknown') | |
age = profile.get('age', 'Unknown') | |
return f"user name: {name}\nuser age: {age}\n\n" + "\n\n".join(formatted_response) | |
def create_new_session(self, title: str = None) -> bool: | |
current_user = self.identity | |
session_id = str(ObjectId()) | |
chat_session = { | |
"user_id": current_user, | |
"session_id": session_id, | |
"created_at": datetime.utcnow(), | |
"last_accessed": datetime.utcnow(), | |
"title": title if title else "New Chat" | |
} | |
try: | |
self.query.create_chat_session(chat_session) | |
self.session_id = session_id | |
return True | |
except Exception as e: | |
raise Exception(f"Failed to create session: {str(e)}") | |
def verify_session_exists(self, session_id: str) -> bool: | |
current_user = self.identity | |
return self.query.verify_session(session_id, current_user) | |
def validate_session(self, session_id: Optional[str] = None, title: str = None) -> bool: | |
if not session_id or not session_id.strip(): | |
return self.create_new_session(title=title) | |
if self.verify_session_exists(session_id): | |
self.session_id = session_id | |
return self.load_chat_history() | |
return self.create_new_session(title=title) | |
def load_session(self, session_id: str) -> bool: | |
return self.validate_session(session_id) | |
def load_chat_history(self) -> bool: | |
if not self.session_id: | |
raise ValueError("No session ID provided.") | |
current_user = self.identity | |
try: | |
self.chats = self.query.get_session_chats(self.session_id, current_user) | |
return True | |
except Exception as e: | |
raise Exception(f"Failed to load chat history: {str(e)}") | |
def get_chat_history(self) -> List[Dict]: | |
return self.chats | |
def format_history(self) -> str: | |
formatted_chats = [] | |
for chat in self.chats: | |
query = chat.get('query', '').strip() | |
response = chat.get('response', '').strip() | |
if query and response: | |
formatted_chats.append(f"User: {query}") | |
formatted_chats.append(f"dermatologist Dr DermAI: {response}") | |
return "\n".join(formatted_chats) if formatted_chats else "" | |
def save_chat(self, chat_data: Dict) -> bool: | |
if not self.session_id: | |
raise ValueError("No active session to save chat") | |
current_user = self.identity | |
data = { | |
"user_id": current_user, | |
"session_id": self.session_id, | |
"query": chat_data.get("query", "").strip(), | |
"response": chat_data.get("response", "").strip(), | |
"references": chat_data.get("references", []), | |
"page_no": chat_data.get("page_no", []), | |
"keywords": chat_data.get("keywords", []), | |
"images": chat_data.get("images", []), | |
"context": chat_data.get("context", ""), | |
"timestamp": datetime.utcnow(), | |
"chat_id": str(ObjectId()) | |
} | |
try: | |
if self.query.create_chat(data): | |
self.query.update_last_accessed_time(self.session_id) | |
self.chats.append(data) | |
return True | |
return False | |
except Exception as e: | |
raise Exception(f"Failed to save chat: {str(e)}") | |
def get_name_and_age(self): | |
current_user = self.identity | |
try: | |
user_profile = self.query.get_user_profile(current_user) | |
return user_profile | |
except Exception as e: | |
raise Exception(f"Failed to get user name and age: {str(e)}") | |
def get_profile(self): | |
current_user = self.identity | |
try: | |
user = query.get_user_profile(current_user) | |
if not user: | |
return {'error': 'User not found'} | |
return { | |
'username': user['username'], | |
'email': user['email'], | |
'name': user['name'], | |
'age': user['age'], | |
'created_at': user['created_at'] | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
def update_title(self , sessionId , new_title): | |
query.update_chat_session_title(sessionId, new_title) | |
def get_city(self) -> Optional[str]: | |
current_user = self.identity | |
try: | |
location_data = self.query.get_location(current_user) | |
if location_data and 'location' in location_data: | |
return location_data['location'] | |
return None | |
except Exception as e: | |
raise Exception(f"Failed to get user city: {str(e)}") | |
def get_language(self) -> Optional[str]: | |
current_user = self.identity | |
try: | |
language = self.query.get_user_language(current_user) | |
if not language : | |
return "english" | |
else: | |
return language | |
return None | |
except Exception as e: | |
raise Exception(f"Failed to get user city: {str(e)}") | |
def get_language(self) -> Optional[str]: | |
current_user = self.identity | |
try: | |
language = self.query.get_user_language(current_user) | |
if not language : | |
return "english" | |
else: | |
return language | |
return None | |
except Exception as e: | |
raise Exception(f"Failed to get user city: {str(e)}") | |
def get_today_schedule(self): | |
data = self.query.get_today_schedule(user_id=self.identity) | |
if not data: | |
return "" | |
return data | |
def save_schedule(self, schedule_data): | |
return self.query.save_schedule(user_id=self.identity, schedule_data=schedule_data) | |
def get_last_seven_days_schedules(self): | |
data = self.query.get_last_seven_days_schedules(user_id=self.identity) | |
if not data: | |
return "" | |
return data | |
def save_details(self, session_id, context, query, response, rag_start_time, rag_end_time): | |
data = self.query.save_rag_interaction( | |
user_id="admin", | |
session_id=session_id, | |
context=context, | |
query=query, | |
response=response, | |
rag_start_time=rag_start_time, | |
rag_end_time=rag_end_time | |
) | |
return data | |
def get_save_details(self, page: int, page_size: int) -> dict: | |
data = self.query.get_rag_interactions( | |
user_id="admin", | |
page=page, | |
page_size=page_size | |
) | |
return data | |
def log_user_image_upload(self): | |
"""Log an image upload for the current user""" | |
try: | |
return self.query.log_image_upload(self.identity) | |
except Exception as e: | |
raise ValueError(f"Failed to log image upload: {e}") | |
def get_user_daily_uploads(self): | |
"""Get number of images uploaded by current user in the last 24 hours""" | |
try: | |
return self.query.get_user_daily_uploads(self.identity) | |
except Exception as e: | |
raise ValueError(f"Failed to get user daily uploads: {e}") | |
def get_user_last_upload_time(self): | |
"""Get the timestamp of current user's most recent image upload""" | |
try: | |
return self.query.get_user_last_upload_time(self.identity) | |
except Exception as e: | |
raise ValueError(f"Failed to get user's last upload time: {e}") | |