from datetime import datetime, timezone from typing import Optional, Dict, Any from concurrent.futures import ThreadPoolExecutor from yake import KeywordExtractor from app.services.chathistory import ChatSession from app.services.websearch import WebSearch from app.services.llm_model import Model from app.services.environmental_condition import EnvironmentalData from app.services.prompts import * from app.services.vector_database_search import VectorDatabaseSearch import re vectordb = VectorDatabaseSearch() class ChatProcessor: def __init__(self, token: str, session_id: Optional[str] = None, num_results: int = 3, num_images: int = 3): self.token = token self.session_id = session_id self.num_results = num_results self.num_images = num_images self.chat_session = ChatSession(token, session_id) self.user_city = self.chat_session.get_city() city = self.user_city if self.user_city else '' self.environment_data = EnvironmentalData(city) self.web_searcher = WebSearch(num_results=num_results, max_images=num_images) self.web_search_required = True def extract_keywords_yake(self, text: str, language: str, max_ngram_size: int = 2, num_keywords: int = 4) -> list: lang_code = "en" if language.lower() == "urdu": lang_code = "ur" kw_extractor = KeywordExtractor( lan=lang_code, n=max_ngram_size, top=num_keywords, features=None ) keywords = kw_extractor.extract_keywords(text) return [kw[0] for kw in keywords] def ensure_valid_session(self, title: str = None) -> str: if not self.session_id or not self.session_id.strip(): self.chat_session.create_new_session(title=title) self.session_id = self.chat_session.session_id else: try: if not self.chat_session.validate_session(self.session_id, title=title): self.chat_session.create_new_session(title=title) self.session_id = self.chat_session.session_id except ValueError: self.chat_session.create_new_session(title=title) self.session_id = self.chat_session.session_id return self.session_id def process_chat(self, query: str) -> Dict[str, Any]: try: profile = self.chat_session.get_name_and_age() name = profile['name'] age = profile['age'] self.chat_session.load_chat_history() self.chat_session.update_title(self.session_id,query) history = self.chat_session.format_history() history_based_prompt = HISTORY_BASED_PROMPT.format(history=history,query= query) enhanced_query = Model().send_message_openrouter(history_based_prompt) self.session_id = self.ensure_valid_session(title=enhanced_query) permission = self.chat_session.get_user_preferences() websearch_enabled = permission.get('websearch', False) env_recommendations = permission.get('environmental_recommendations', False) personalized_recommendations = permission.get('personalized_recommendations', False) keywords_permission = permission.get('keywords', False) reference_permission = permission.get('references', False) language = self.chat_session.get_language().lower() language_prompt = LANGUAGE_RESPONSE_PROMPT.format(language = language) if websearch_enabled : with ThreadPoolExecutor(max_workers=2) as executor: future_web = executor.submit(self.web_searcher.search, enhanced_query) future_images = executor.submit(self.web_searcher.search_images, enhanced_query) web_results = future_web.result() image_results = future_images.result() context_parts = [] references = [] for idx, result in enumerate(web_results, 1): if result['text']: context_parts.append(f"From Source {idx}: {result['text']}\n") references.append(result['link']) context = "\n".join(context_parts) if env_recommendations and personalized_recommendations: prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format( user_name=name, user_age=age, history=history, user_details=self.chat_session.get_personalized_recommendation(), environmental_condition=self.environment_data.get_environmental_data(), previous_history=history, context=context, current_query=enhanced_query ) elif personalized_recommendations: prompt = PERSONALIZED_PROMPT.format( user_name=name, user_age=age, user_details=self.chat_session.get_personalized_recommendation(), previous_history=history, context=context, current_query=enhanced_query ) elif env_recommendations : prompt = ENVIRONMENTAL_PROMPT.format( user_name=name, user_age=age, environmental_condition=self.environment_data.get_environmental_data(), previous_history=history, context=context, current_query=enhanced_query ) else: prompt = DEFAULT_PROMPT.format( previous_history=history, context=context, current_query=enhanced_query ) prompt = prompt + language_prompt response = Model().llm(prompt,enhanced_query) keywords = "" if (keywords_permission): keywords = self.extract_keywords_yake(response, language=language) if (not reference_permission): references = "" chat_data = { "query": enhanced_query, "response": response, "references": references, "page_no": "", "keywords": keywords, "images": image_results, "context": context, "timestamp": datetime.now(timezone.utc).isoformat(), "session_id": self.chat_session.session_id } if not self.chat_session.save_chat(chat_data): raise ValueError("Failed to save chat message") return chat_data else: attach_image = False with ThreadPoolExecutor(max_workers=2) as executor: future_images = executor.submit(self.web_searcher.search_images, enhanced_query) image_results = future_images.result() start_time = datetime.now(timezone.utc) results = vectordb.search( query=enhanced_query, top_k=3) context_parts = [] references = [] seen_pages = set() for result in results: confidence = result['confidence'] if confidence > 60: context_parts.append(f"Content: {result['content']}") page = result['page'] if page not in seen_pages: # Only append if page is not seen references.append(f"Source: {result['source']}, Page: {page}") seen_pages.add(page) attach_image = True context = "\n".join(context_parts) if not context or len(context) < 10: context = "There is no context found unfortunately" if env_recommendations and personalized_recommendations: prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format( user_name=name, user_age = age, history=history, user_details=self.chat_session.get_personalized_recommendation(), environmental_condition=self.environment_data.get_environmental_data(), previous_history=history, context=context, current_query=enhanced_query ) elif personalized_recommendations: prompt = PERSONALIZED_PROMPT.format( user_name=name, user_age=age, user_details=self.chat_session.get_personalized_recommendation(), previous_history=history, context=context, current_query=enhanced_query ) elif env_recommendations : prompt = ENVIRONMENTAL_PROMPT.format( user_name=name, user_age=age, environmental_condition=self.environment_data.get_environmental_data(), previous_history=history, context=context, current_query=enhanced_query ) else: prompt = DEFAULT_PROMPT.format( previous_history=history, context=context, current_query=enhanced_query ) prompt = prompt + language_prompt response = Model().response = Model().llm(prompt,query) end_time = datetime.now(timezone.utc) keywords = "" if (keywords_permission): keywords = self.extract_keywords_yake(response, language=language) if (not reference_permission): references = "" if not attach_image: image_results = "" keywords = "" chat_data = { "query": enhanced_query, "response": response, "references": references, "page_no": "", "keywords": keywords, "images": image_results, "context": context, "timestamp": datetime.now(timezone.utc).isoformat(), "session_id": self.chat_session.session_id } match = re.search(r'(## Personal Recommendations|## Environmental Considerations)', response) if match: truncated_response = response[:match.start()].strip() else: truncated_response = response if not self.chat_session.save_details(session_id=self.session_id , context= context , query= enhanced_query , response=truncated_response , rag_start_time=start_time , rag_end_time=end_time ): raise ValueError("Failed to save the RAG details") if not self.chat_session.save_chat(chat_data): raise ValueError("Failed to save chat message") return chat_data except Exception as e: return { "error": str(e), "query": query, "response": "Sorry, there was an error processing your request.", "timestamp": datetime.now(timezone.utc).isoformat() } def web_search(self, query: str) -> Dict[str, Any]: if self.session_id and len(self.session_id) > 5: return self.process_chat(query=query) else: return self.process_chat(query=query)