derm-ai / app /services /chat_processor.py
muhammadnoman76's picture
update
29ce710
from datetime import datetime, timezone
from typing import Optional, Dict, Any
from concurrent.futures import ThreadPoolExecutor
from yake import KeywordExtractor
from app.services.chathistory import ChatSession
from app.services.websearch import WebSearch
from app.services.llm_model import Model
from app.services.environmental_condition import EnvironmentalData
from app.services.prompts import *
from app.services.vector_database_search import VectorDatabaseSearch
import re
vectordb = VectorDatabaseSearch()
class ChatProcessor:
def __init__(self, token: str, session_id: Optional[str] = None, num_results: int = 3, num_images: int = 3):
self.token = token
self.session_id = session_id
self.num_results = num_results
self.num_images = num_images
self.chat_session = ChatSession(token, session_id)
self.user_city = self.chat_session.get_city()
city = self.user_city if self.user_city else ''
self.environment_data = EnvironmentalData(city)
self.web_searcher = WebSearch(num_results=num_results, max_images=num_images)
self.web_search_required = True
def extract_keywords_yake(self, text: str, language: str, max_ngram_size: int = 2, num_keywords: int = 4) -> list:
lang_code = "en"
if language.lower() == "urdu":
lang_code = "ur"
kw_extractor = KeywordExtractor(
lan=lang_code,
n=max_ngram_size,
top=num_keywords,
features=None
)
keywords = kw_extractor.extract_keywords(text)
return [kw[0] for kw in keywords]
def ensure_valid_session(self, title: str = None) -> str:
if not self.session_id or not self.session_id.strip():
self.chat_session.create_new_session(title=title)
self.session_id = self.chat_session.session_id
else:
try:
if not self.chat_session.validate_session(self.session_id, title=title):
self.chat_session.create_new_session(title=title)
self.session_id = self.chat_session.session_id
except ValueError:
self.chat_session.create_new_session(title=title)
self.session_id = self.chat_session.session_id
return self.session_id
def process_chat(self, query: str) -> Dict[str, Any]:
try:
profile = self.chat_session.get_name_and_age()
name = profile['name']
age = profile['age']
self.chat_session.load_chat_history()
self.chat_session.update_title(self.session_id,query)
history = self.chat_session.format_history()
history_based_prompt = HISTORY_BASED_PROMPT.format(history=history,query= query)
enhanced_query = Model().send_message_openrouter(history_based_prompt)
self.session_id = self.ensure_valid_session(title=enhanced_query)
permission = self.chat_session.get_user_preferences()
websearch_enabled = permission.get('websearch', False)
env_recommendations = permission.get('environmental_recommendations', False)
personalized_recommendations = permission.get('personalized_recommendations', False)
keywords_permission = permission.get('keywords', False)
reference_permission = permission.get('references', False)
language = self.chat_session.get_language().lower()
language_prompt = LANGUAGE_RESPONSE_PROMPT.format(language = language)
if websearch_enabled :
with ThreadPoolExecutor(max_workers=2) as executor:
future_web = executor.submit(self.web_searcher.search, enhanced_query)
future_images = executor.submit(self.web_searcher.search_images, enhanced_query)
web_results = future_web.result()
image_results = future_images.result()
context_parts = []
references = []
for idx, result in enumerate(web_results, 1):
if result['text']:
context_parts.append(f"From Source {idx}: {result['text']}\n")
references.append(result['link'])
context = "\n".join(context_parts)
if env_recommendations and personalized_recommendations:
prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format(
user_name=name,
user_age=age,
history=history,
user_details=self.chat_session.get_personalized_recommendation(),
environmental_condition=self.environment_data.get_environmental_data(),
previous_history=history,
context=context,
current_query=enhanced_query
)
elif personalized_recommendations:
prompt = PERSONALIZED_PROMPT.format(
user_name=name,
user_age=age,
user_details=self.chat_session.get_personalized_recommendation(),
previous_history=history,
context=context,
current_query=enhanced_query
)
elif env_recommendations :
prompt = ENVIRONMENTAL_PROMPT.format(
user_name=name,
user_age=age,
environmental_condition=self.environment_data.get_environmental_data(),
previous_history=history,
context=context,
current_query=enhanced_query
)
else:
prompt = DEFAULT_PROMPT.format(
previous_history=history,
context=context,
current_query=enhanced_query
)
prompt = prompt + language_prompt
response = Model().llm(prompt,enhanced_query)
keywords = ""
if (keywords_permission):
keywords = self.extract_keywords_yake(response, language=language)
if (not reference_permission):
references = ""
chat_data = {
"query": enhanced_query,
"response": response,
"references": references,
"page_no": "",
"keywords": keywords,
"images": image_results,
"context": context,
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": self.chat_session.session_id
}
if not self.chat_session.save_chat(chat_data):
raise ValueError("Failed to save chat message")
return chat_data
else:
attach_image = False
with ThreadPoolExecutor(max_workers=2) as executor:
future_images = executor.submit(self.web_searcher.search_images, enhanced_query)
image_results = future_images.result()
start_time = datetime.now(timezone.utc)
results = vectordb.search( query=enhanced_query, top_k=3)
context_parts = []
references = []
seen_pages = set()
for result in results:
confidence = result['confidence']
if confidence > 60:
context_parts.append(f"Content: {result['content']}")
page = result['page']
if page not in seen_pages: # Only append if page is not seen
references.append(f"Source: {result['source']}, Page: {page}")
seen_pages.add(page)
attach_image = True
context = "\n".join(context_parts)
if not context or len(context) < 10:
context = "There is no context found unfortunately"
if env_recommendations and personalized_recommendations:
prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format(
user_name=name,
user_age = age,
history=history,
user_details=self.chat_session.get_personalized_recommendation(),
environmental_condition=self.environment_data.get_environmental_data(),
previous_history=history,
context=context,
current_query=enhanced_query
)
elif personalized_recommendations:
prompt = PERSONALIZED_PROMPT.format(
user_name=name,
user_age=age,
user_details=self.chat_session.get_personalized_recommendation(),
previous_history=history,
context=context,
current_query=enhanced_query
)
elif env_recommendations :
prompt = ENVIRONMENTAL_PROMPT.format(
user_name=name,
user_age=age,
environmental_condition=self.environment_data.get_environmental_data(),
previous_history=history,
context=context,
current_query=enhanced_query
)
else:
prompt = DEFAULT_PROMPT.format(
previous_history=history,
context=context,
current_query=enhanced_query
)
prompt = prompt + language_prompt
response = Model().response = Model().llm(prompt,query)
end_time = datetime.now(timezone.utc)
keywords = ""
if (keywords_permission):
keywords = self.extract_keywords_yake(response, language=language)
if (not reference_permission):
references = ""
if not attach_image:
image_results = ""
keywords = ""
chat_data = {
"query": enhanced_query,
"response": response,
"references": references,
"page_no": "",
"keywords": keywords,
"images": image_results,
"context": context,
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": self.chat_session.session_id
}
match = re.search(r'(## Personal Recommendations|## Environmental Considerations)', response)
if match:
truncated_response = response[:match.start()].strip()
else:
truncated_response = response
if not self.chat_session.save_details(session_id=self.session_id , context= context , query= enhanced_query , response=truncated_response , rag_start_time=start_time , rag_end_time=end_time ):
raise ValueError("Failed to save the RAG details")
if not self.chat_session.save_chat(chat_data):
raise ValueError("Failed to save chat message")
return chat_data
except Exception as e:
return {
"error": str(e),
"query": query,
"response": "Sorry, there was an error processing your request.",
"timestamp": datetime.now(timezone.utc).isoformat()
}
def web_search(self, query: str) -> Dict[str, Any]:
if self.session_id and len(self.session_id) > 5:
return self.process_chat(query=query)
else:
return self.process_chat(query=query)