derm-ai / app /services /chat_processor.py
muhammadnoman76's picture
update
c9d249e
raw
history blame
12 kB
from datetime import datetime, timezone
from typing import Optional, Dict, Any
from concurrent.futures import ThreadPoolExecutor
from yake import KeywordExtractor
from app.services.chathistory import ChatSession
from app.services.websearch import WebSearch
from app.services.llm_model import Model
from app.services.environmental_condition import EnvironmentalData
from app.services.prompts import *
from app.services.vector_database_search import VectorDatabaseSearch
import re
vectordb = VectorDatabaseSearch()
class ChatProcessor:
def __init__(self, token: str, session_id: Optional[str] = None, num_results: int = 3, num_images: int = 3):
self.token = token
self.session_id = session_id
self.num_results = num_results
self.num_images = num_images
self.chat_session = ChatSession(token, session_id)
self.user_city = self.chat_session.get_city()
city = self.user_city if self.user_city else ''
self.environment_data = EnvironmentalData(city)
self.web_searcher = WebSearch(num_results=num_results, max_images=num_images)
def extract_keywords_yake(self, text: str, language: str, max_ngram_size: int = 2, num_keywords: int = 4) -> list:
lang_code = "en"
if language.lower() == "urdu":
lang_code = "ur"
kw_extractor = KeywordExtractor(
lan=lang_code,
n=max_ngram_size,
top=num_keywords,
features=None
)
keywords = kw_extractor.extract_keywords(text)
return [kw[0] for kw in keywords]
def ensure_valid_session(self, title: str = None) -> str:
if not self.session_id or not self.session_id.strip():
self.chat_session.create_new_session(title=title)
self.session_id = self.chat_session.session_id
else:
try:
if not self.chat_session.validate_session(self.session_id, title=title):
self.chat_session.create_new_session(title=title)
self.session_id = self.chat_session.session_id
except ValueError:
self.chat_session.create_new_session(title=title)
self.session_id = self.chat_session.session_id
return self.session_id
def process_chat(self, query: str) -> Dict[str, Any]:
try:
profile = self.chat_session.get_name_and_age()
name = profile['name']
age = profile['age']
self.chat_session.load_chat_history()
self.chat_session.update_title(self.session_id, query)
history = self.chat_session.format_history()
history_based_prompt = HISTORY_BASED_PROMPT.format(history=history, query=query)
enhanced_query = Model().send_message_openrouter(history_based_prompt)
self.session_id = self.ensure_valid_session(title=enhanced_query)
permission = self.chat_session.get_user_preferences()
websearch_enabled = permission.get('websearch', False)
env_recommendations = permission.get('environmental_recommendations', False)
personalized_recommendations = permission.get('personalized_recommendations', False)
keywords_permission = permission.get('keywords', False)
reference_permission = permission.get('references', False)
language = self.chat_session.get_language().lower()
language_prompt = LANGUAGE_RESPONSE_PROMPT.format(language=language)
if websearch_enabled:
return self._process_with_websearch(enhanced_query, query, name, age, history, language,
env_recommendations, personalized_recommendations,
keywords_permission, reference_permission, language_prompt)
else:
return self._process_with_vectordb(enhanced_query, query, name, age, history, language,
env_recommendations, personalized_recommendations,
keywords_permission, reference_permission, language_prompt)
except Exception as e:
return {
"error": str(e),
"query": query,
"response": "Sorry, there was an error processing your request.",
"timestamp": datetime.now(timezone.utc).isoformat()
}
def _process_with_websearch(self, enhanced_query, original_query, name, age, history, language,
env_recommendations, personalized_recommendations,
keywords_permission, reference_permission, language_prompt):
with ThreadPoolExecutor(max_workers=2) as executor:
future_web = executor.submit(self.web_searcher.search, enhanced_query)
future_images = executor.submit(self.web_searcher.search_images, enhanced_query)
web_results = future_web.result()
image_results = future_images.result()
context_parts = []
references = []
for idx, result in enumerate(web_results, 1):
if result['text']:
context_parts.append(f"From Source {idx}: {result['text']}\n")
references.append(result['link'])
context = "\n".join(context_parts)
if env_recommendations and personalized_recommendations:
prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format(
user_name=name,
user_age=age,
history=history,
user_details=self.chat_session.get_personalized_recommendation(),
environmental_condition=self.environment_data.get_environmental_data(),
previous_history=history,
context=context,
current_query=enhanced_query
)
elif personalized_recommendations:
prompt = PERSONALIZED_PROMPT.format(
user_name=name,
user_age=age,
user_details=self.chat_session.get_personalized_recommendation(),
previous_history=history,
context=context,
current_query=enhanced_query
)
elif env_recommendations:
prompt = ENVIRONMENTAL_PROMPT.format(
user_name=name,
user_age=age,
environmental_condition=self.environment_data.get_environmental_data(),
previous_history=history,
context=context,
current_query=enhanced_query
)
else:
prompt = DEFAULT_PROMPT.format(
previous_history=history,
context=context,
current_query=enhanced_query
)
prompt = prompt + language_prompt
response = Model().llm(prompt, enhanced_query)
keywords = self.extract_keywords_yake(response, language=language) if keywords_permission else ""
references = references if reference_permission else ""
chat_data = {
"query": enhanced_query,
"response": response,
"references": references,
"page_no": "",
"keywords": keywords,
"images": image_results,
"context": context,
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": self.chat_session.session_id
}
if not self.chat_session.save_chat(chat_data):
raise ValueError("Failed to save chat message")
return chat_data
def _process_with_vectordb(self, enhanced_query, original_query, name, age, history, language,
env_recommendations, personalized_recommendations,
keywords_permission, reference_permission, language_prompt):
attach_image = False
with ThreadPoolExecutor(max_workers=1) as executor:
future_images = executor.submit(self.web_searcher.search_images, enhanced_query)
image_results = future_images.result()
start_time = datetime.now(timezone.utc)
results = vectordb.search(query=enhanced_query, top_k=3)
context_parts = []
references = []
seen_pages = set()
for result in results:
confidence = result['confidence']
if confidence > 60:
context_parts.append(f"Content: {result['content']}")
page = result['page']
if page not in seen_pages:
references.append(f"Source: {result['source']}, Page: {page}")
seen_pages.add(page)
attach_image = True
context = "\n".join(context_parts)
if not context or len(context) < 10:
context = "There is no context found unfortunately"
if env_recommendations and personalized_recommendations:
prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format(
user_name=name,
user_age=age,
history=history,
user_details=self.chat_session.get_personalized_recommendation(),
environmental_condition=self.environment_data.get_environmental_data(),
previous_history=history,
context=context,
current_query=enhanced_query
)
elif personalized_recommendations:
prompt = PERSONALIZED_PROMPT.format(
user_name=name,
user_age=age,
user_details=self.chat_session.get_personalized_recommendation(),
previous_history=history,
context=context,
current_query=enhanced_query
)
elif env_recommendations:
prompt = ENVIRONMENTAL_PROMPT.format(
user_name=name,
user_age=age,
environmental_condition=self.environment_data.get_environmental_data(),
previous_history=history,
context=context,
current_query=enhanced_query
)
else:
prompt = DEFAULT_PROMPT.format(
previous_history=history,
context=context,
current_query=enhanced_query
)
prompt = prompt + language_prompt
response = Model().llm(prompt, original_query)
end_time = datetime.now(timezone.utc)
keywords = self.extract_keywords_yake(response, language=language) if keywords_permission else ""
references = references if reference_permission else ""
if not attach_image:
image_results = ""
keywords = ""
chat_data = {
"query": enhanced_query,
"response": response,
"references": references,
"page_no": "",
"keywords": keywords,
"images": image_results,
"context": context,
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": self.chat_session.session_id
}
match = re.search(r'(## Personal Recommendations|## Environmental Considerations)', response)
truncated_response = response[:match.start()].strip() if match else response
if not self.chat_session.save_details(
session_id=self.session_id,
context=context,
query=enhanced_query,
response=truncated_response,
rag_start_time=start_time,
rag_end_time=end_time
):
raise ValueError("Failed to save the RAG details")
if not self.chat_session.save_chat(chat_data):
raise ValueError("Failed to save chat message")
return chat_data
def web_search(self, query: str) -> Dict[str, Any]:
return self.process_chat(query=query)