Spaces:
Running
Running
from datetime import datetime, timezone | |
from typing import Optional, Dict, Any | |
from concurrent.futures import ThreadPoolExecutor | |
from yake import KeywordExtractor | |
from app.services.chathistory import ChatSession | |
from app.services.websearch import WebSearch | |
from app.services.llm_model import Model | |
from app.services.environmental_condition import EnvironmentalData | |
from app.services.prompts import * | |
from app.services.vector_database_search import VectorDatabaseSearch | |
import re | |
vectordb = VectorDatabaseSearch() | |
class ChatProcessor: | |
def __init__(self, token: str, session_id: Optional[str] = None, num_results: int = 3, num_images: int = 3): | |
self.token = token | |
self.session_id = session_id | |
self.num_results = num_results | |
self.num_images = num_images | |
self.chat_session = ChatSession(token, session_id) | |
self.user_city = self.chat_session.get_city() | |
city = self.user_city if self.user_city else '' | |
self.environment_data = EnvironmentalData(city) | |
self.web_searcher = WebSearch(num_results=num_results, max_images=num_images) | |
self.web_search_required = True | |
def extract_keywords_yake(self, text: str, language: str, max_ngram_size: int = 2, num_keywords: int = 4) -> list: | |
lang_code = "en" | |
if language.lower() == "urdu": | |
lang_code = "ur" | |
kw_extractor = KeywordExtractor( | |
lan=lang_code, | |
n=max_ngram_size, | |
top=num_keywords, | |
features=None | |
) | |
keywords = kw_extractor.extract_keywords(text) | |
return [kw[0] for kw in keywords] | |
def ensure_valid_session(self, title: str = None) -> str: | |
if not self.session_id or not self.session_id.strip(): | |
self.chat_session.create_new_session(title=title) | |
self.session_id = self.chat_session.session_id | |
else: | |
try: | |
if not self.chat_session.validate_session(self.session_id, title=title): | |
self.chat_session.create_new_session(title=title) | |
self.session_id = self.chat_session.session_id | |
except ValueError: | |
self.chat_session.create_new_session(title=title) | |
self.session_id = self.chat_session.session_id | |
return self.session_id | |
def process_chat(self, query: str) -> Dict[str, Any]: | |
try: | |
profile = self.chat_session.get_name_and_age() | |
name = profile['name'] | |
age = profile['age'] | |
self.chat_session.load_chat_history() | |
self.chat_session.update_title(self.session_id,query) | |
history = self.chat_session.format_history() | |
history_based_prompt = HISTORY_BASED_PROMPT.format(history=history,query= query) | |
enhanced_query = Model().send_message_openrouter(history_based_prompt) | |
self.session_id = self.ensure_valid_session(title=enhanced_query) | |
permission = self.chat_session.get_user_preferences() | |
websearch_enabled = permission.get('websearch', False) | |
env_recommendations = permission.get('environmental_recommendations', False) | |
personalized_recommendations = permission.get('personalized_recommendations', False) | |
keywords_permission = permission.get('keywords', False) | |
reference_permission = permission.get('references', False) | |
language = self.chat_session.get_language().lower() | |
language_prompt = LANGUAGE_RESPONSE_PROMPT.format(language = language) | |
if websearch_enabled : | |
with ThreadPoolExecutor(max_workers=2) as executor: | |
future_web = executor.submit(self.web_searcher.search, enhanced_query) | |
future_images = executor.submit(self.web_searcher.search_images, enhanced_query) | |
web_results = future_web.result() | |
image_results = future_images.result() | |
context_parts = [] | |
references = [] | |
for idx, result in enumerate(web_results, 1): | |
if result['text']: | |
context_parts.append(f"From Source {idx}: {result['text']}\n") | |
references.append(result['link']) | |
context = "\n".join(context_parts) | |
if env_recommendations and personalized_recommendations: | |
prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
history=history, | |
user_details=self.chat_session.get_personalized_recommendation(), | |
environmental_condition=self.environment_data.get_environmental_data(), | |
previous_history=history, | |
context=context, | |
current_query=enhanced_query | |
) | |
elif personalized_recommendations: | |
prompt = PERSONALIZED_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
user_details=self.chat_session.get_personalized_recommendation(), | |
previous_history=history, | |
context=context, | |
current_query=enhanced_query | |
) | |
elif env_recommendations : | |
prompt = ENVIRONMENTAL_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
environmental_condition=self.environment_data.get_environmental_data(), | |
previous_history=history, | |
context=context, | |
current_query=enhanced_query | |
) | |
else: | |
prompt = DEFAULT_PROMPT.format( | |
previous_history=history, | |
context=context, | |
current_query=enhanced_query | |
) | |
prompt = prompt + language_prompt | |
response = Model().llm(prompt,enhanced_query) | |
keywords = "" | |
if (keywords_permission): | |
keywords = self.extract_keywords_yake(response, language=language) | |
if (not reference_permission): | |
references = "" | |
chat_data = { | |
"query": enhanced_query, | |
"response": response, | |
"references": references, | |
"page_no": "", | |
"keywords": keywords, | |
"images": image_results, | |
"context": context, | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.chat_session.session_id | |
} | |
if not self.chat_session.save_chat(chat_data): | |
raise ValueError("Failed to save chat message") | |
return chat_data | |
else: | |
attach_image = False | |
with ThreadPoolExecutor(max_workers=2) as executor: | |
future_images = executor.submit(self.web_searcher.search_images, enhanced_query) | |
image_results = future_images.result() | |
start_time = datetime.now(timezone.utc) | |
results = vectordb.search( query=enhanced_query, top_k=3) | |
context_parts = [] | |
references = [] | |
seen_pages = set() | |
for result in results: | |
confidence = result['confidence'] | |
if confidence > 60: | |
context_parts.append(f"Content: {result['content']}") | |
page = result['page'] | |
if page not in seen_pages: # Only append if page is not seen | |
references.append(f"Source: {result['source']}, Page: {page}") | |
seen_pages.add(page) | |
attach_image = True | |
context = "\n".join(context_parts) | |
if not context or len(context) < 10: | |
context = "There is no context found unfortunately" | |
if env_recommendations and personalized_recommendations: | |
prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format( | |
user_name=name, | |
user_age = age, | |
history=history, | |
user_details=self.chat_session.get_personalized_recommendation(), | |
environmental_condition=self.environment_data.get_environmental_data(), | |
previous_history=history, | |
context=context, | |
current_query=enhanced_query | |
) | |
elif personalized_recommendations: | |
prompt = PERSONALIZED_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
user_details=self.chat_session.get_personalized_recommendation(), | |
previous_history=history, | |
context=context, | |
current_query=enhanced_query | |
) | |
elif env_recommendations : | |
prompt = ENVIRONMENTAL_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
environmental_condition=self.environment_data.get_environmental_data(), | |
previous_history=history, | |
context=context, | |
current_query=enhanced_query | |
) | |
else: | |
prompt = DEFAULT_PROMPT.format( | |
previous_history=history, | |
context=context, | |
current_query=enhanced_query | |
) | |
prompt = prompt + language_prompt | |
response = Model().response = Model().llm(prompt,query) | |
end_time = datetime.now(timezone.utc) | |
keywords = "" | |
if (keywords_permission): | |
keywords = self.extract_keywords_yake(response, language=language) | |
if (not reference_permission): | |
references = "" | |
if not attach_image: | |
image_results = "" | |
keywords = "" | |
chat_data = { | |
"query": enhanced_query, | |
"response": response, | |
"references": references, | |
"page_no": "", | |
"keywords": keywords, | |
"images": image_results, | |
"context": context, | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.chat_session.session_id | |
} | |
match = re.search(r'(## Personal Recommendations|## Environmental Considerations)', response) | |
if match: | |
truncated_response = response[:match.start()].strip() | |
else: | |
truncated_response = response | |
if not self.chat_session.save_details(session_id=self.session_id , context= context , query= enhanced_query , response=truncated_response , rag_start_time=start_time , rag_end_time=end_time ): | |
raise ValueError("Failed to save the RAG details") | |
if not self.chat_session.save_chat(chat_data): | |
raise ValueError("Failed to save chat message") | |
return chat_data | |
except Exception as e: | |
return { | |
"error": str(e), | |
"query": query, | |
"response": "Sorry, there was an error processing your request.", | |
"timestamp": datetime.now(timezone.utc).isoformat() | |
} | |
def web_search(self, query: str) -> Dict[str, Any]: | |
if self.session_id and len(self.session_id) > 5: | |
return self.process_chat(query=query) | |
else: | |
return self.process_chat(query=query) |