Spaces:
Running
Running
from datetime import datetime, timezone, timedelta | |
from typing import Dict, Any | |
from concurrent.futures import ThreadPoolExecutor | |
from yake import KeywordExtractor | |
from app.services.chathistory import ChatSession | |
from app.services.websearch import WebSearch | |
from app.services.llm_model import Model | |
from app.services.environmental_condition import EnvironmentalData | |
from app.services.prompts import * | |
from app.services.vector_database_search import VectorDatabaseSearch | |
from app.services.image_classification_vit import SkinDiseaseClassifier | |
import io | |
from PIL import Image | |
import os | |
import shutil | |
from werkzeug.utils import secure_filename | |
temp_dir = "temp" | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
upload_dir = "uploads" | |
if not os.path.exists(upload_dir): | |
os.makedirs(upload_dir) | |
class ImageProcessor: | |
def __init__(self, token: str, session_id: str, num_results: int, num_images: int, image): | |
self.token = token | |
self.image = image | |
self.session_id = session_id | |
self.num_results = num_results | |
self.num_images = num_images | |
self.vectordb = VectorDatabaseSearch() | |
self.chat_session = ChatSession(token, session_id) | |
self.user_city = self.chat_session.get_city() | |
city = self.user_city if self.user_city else '' | |
self.environment_data = EnvironmentalData(city) | |
self.web_searcher = WebSearch(num_results=num_results, max_images=num_images) | |
def extract_keywords_yake(self, text: str, language: str, max_ngram_size: int = 2, num_keywords: int = 4) -> list: | |
lang_code = "en" | |
if language.lower() == "urdu": | |
lang_code = "ur" | |
kw_extractor = KeywordExtractor( | |
lan=lang_code, | |
n=max_ngram_size, | |
top=num_keywords, | |
features=None | |
) | |
keywords = kw_extractor.extract_keywords(text) | |
return [kw[0] for kw in keywords] | |
def ensure_valid_session(self, title: str = None) -> str: | |
if not self.session_id or not self.session_id.strip(): | |
self.chat_session.create_new_session(title=title) | |
self.session_id = self.chat_session.session_id | |
else: | |
try: | |
if not self.chat_session.validate_session(self.session_id, title=title): | |
self.chat_session.create_new_session(title=title) | |
self.session_id = self.chat_session.session_id | |
except ValueError: | |
self.chat_session.create_new_session(title=title) | |
self.session_id = self.chat_session.session_id | |
return self.session_id | |
def validate_upload(self): | |
"""Validate if user can upload an image based on daily limit and time restriction""" | |
try: | |
# Check daily upload limit | |
daily_uploads = self.chat_session.get_user_daily_uploads() | |
print(f"Daily uploads: {daily_uploads}") | |
if daily_uploads >= 5: | |
if self.chat_session.get_language().lower() == "urdu": | |
return False, "آپ کی روزانہ کی حد (5 تصاویر) پوری ہو چکی ہے۔ براہ کرم کل کوشش کریں۔" | |
else: | |
return False, "You've reached your daily limit (5 images). Please try again tomorrow." | |
# Check time between uploads | |
last_upload_time = self.chat_session.get_user_last_upload_time() | |
print(f"Last upload time: {last_upload_time}") | |
if last_upload_time: | |
# Ensure last_upload_time is timezone-aware | |
if last_upload_time.tzinfo is None: | |
# If naive, make it timezone-aware by attaching UTC | |
last_upload_time = last_upload_time.replace(tzinfo=timezone.utc) | |
# Now get the current time (which is already timezone-aware) | |
now = datetime.now(timezone.utc) | |
# Now both times are timezone-aware, so the subtraction will work | |
time_since_last = now - last_upload_time | |
print(f"Time since last: {time_since_last}") | |
if time_since_last < timedelta(minutes=1): | |
seconds_remaining = 60 - time_since_last.seconds | |
print(f"Seconds remaining: {seconds_remaining}") | |
if self.chat_session.get_language().lower() == "urdu": | |
return False, f"براہ کرم {seconds_remaining} سیکنڈ انتظار کریں اور دوبارہ کوشش کریں۔" | |
else: | |
return False, f"Please wait {seconds_remaining} seconds before uploading another image." | |
# Log this upload | |
result = self.chat_session.log_user_image_upload() | |
print(f"Logged upload: {result}") | |
return True, "" | |
except Exception as e: | |
print(f"Error in validate_upload: {str(e)}") | |
# Fail safely - if we can't validate, we should allow the upload | |
return True, "" | |
def process_chat(self, query: str) -> Dict[str, Any]: | |
try: | |
is_valid, message = self.validate_upload() | |
if not is_valid: | |
return { | |
"query": query, | |
"response": message, | |
"references": "", | |
"page_no": "", | |
"keywords": "", | |
"images": "", | |
"context": "", | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.session_id or "" | |
} | |
profile = self.chat_session.get_name_and_age() | |
name = profile['name'] | |
age = profile['age'] | |
self.chat_session.load_chat_history() | |
self.chat_session.update_title(self.session_id, query) | |
history = self.chat_session.format_history() | |
language = self.chat_session.get_language().lower() | |
filename = secure_filename(self.image.filename) | |
temp_path = os.path.join(temp_dir, filename) | |
upload_path = os.path.join(upload_dir, filename) | |
content = self.image.file.read() | |
with open(temp_path, 'wb') as buffer: | |
buffer.write(content) | |
self.image.file.seek(0) | |
img_content = io.BytesIO(content) | |
pil_image = Image.open(img_content) | |
self.image.file.seek(0) | |
def background_file_ops(src, dst): | |
shutil.copy2(src, dst) | |
os.remove(src) | |
with ThreadPoolExecutor(max_workers=1) as file_executor: | |
file_executor.submit(background_file_ops, temp_path, upload_path) | |
if language != "urdu": | |
response1 = "Please provide a clear image of your skin with good lighting and a proper angle, without any filters! we can only analysis the image of skin :)" | |
response3 = "You have healthy skin, MaShaAllah! I don't notice any issues at the moment. However, based on my current confidence level of {diseases_detection_confidence}, I recommend consulting a doctor for more detailed advice and analysis." | |
response4 = "I'm sorry, I'm not able to identify your skin condition yet as I'm still learning, but I hope to be able to detect any skin issues in the future. :) Right now, my confidence in identifying your skin is below 50%." | |
response5 = ADVICE_REPORT_SUGGESTION | |
else: | |
response1 = "براہ کرم اپنی جلد کی واضح تصویر اچھی روشنی اور مناسب زاویے سے فراہم کریں، کسی فلٹر کے بغیر! ہم صرف جلد کی تصویر کا تجزیہ کر سکتے ہیں" | |
response3 = "آپ کی جلد صحت مند ہے، ماشاءاللہ! مجھے اس وقت کوئی مسئلہ نظر نہیں آ رہا۔ تاہم، میری موجودہ اعتماد کی سطح {diseases_detection_confidence} کی بنیاد پر، میں مزید تفصیلی مشورے اور تجزیے کے لیے ڈاکٹر سے رجوع کرنے کی تجویز کرتا ہوں۔" | |
response4 = "معذرت، میں ابھی آپ کی جلد کی حالت کی شناخت کرنے کے قابل نہیں ہوں کیونکہ میں ابھی سیکھ رہا ہوں، لیکن مجھے امید ہے کہ مستقبل میں جلد کے کسی بھی مسئلے کو پہچان سکوں گا۔ :) اس وقت آپ کی جلد کی شناخت میں میرا اعتماد 50% سے کم ہے۔" | |
response5 = URDU_ADVICE_REPORT_SUGGESTION | |
model = Model() | |
result = model.llm_image(text=SKIN_NON_SKIN_PROMPT, image=pil_image) | |
result_lower = result.lower().strip() | |
is_negative = any(marker in result_lower for marker in ["<no>", "no"]) | |
if is_negative: | |
chat_data = { | |
"query": query, | |
"response": response1, | |
"references": "", | |
"page_no": filename, | |
"keywords": "", | |
"images": "", | |
"context": "", | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.chat_session.session_id | |
} | |
if not self.chat_session.save_chat(chat_data): | |
raise ValueError("Failed to save chat message") | |
return chat_data | |
diseases_detector = SkinDiseaseClassifier() | |
diseases_name, diseases_detection_confidence = diseases_detector.predict(pil_image, 5) | |
if diseases_name == "Healthy Skin": | |
chat_data = { | |
"query": query, | |
"response": response3.format(diseases_detection_confidence=diseases_detection_confidence), | |
"references": "", | |
"page_no": filename, | |
"keywords": "", | |
"images": "", | |
"context": "", | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.chat_session.session_id | |
} | |
if not self.chat_session.save_chat(chat_data): | |
raise ValueError("Failed to save chat message") | |
return chat_data | |
elif diseases_detection_confidence < 46: | |
chat_data = { | |
"query": query, | |
"response": response4, | |
"references": "", | |
"page_no": filename, | |
"keywords": "", | |
"images": "", | |
"context": "", | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.chat_session.session_id | |
} | |
if not self.chat_session.save_chat(chat_data): | |
raise ValueError("Failed to save chat message") | |
return chat_data | |
if not result: | |
chat_data = { | |
"query": query, | |
"response": response1, | |
"references": "", | |
"page_no": filename, | |
"keywords": "", | |
"images": "", | |
"context": "", | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.chat_session.session_id | |
} | |
if not self.chat_session.save_chat(chat_data): | |
raise ValueError("Failed to save chat message") | |
return chat_data | |
self.session_id = self.ensure_valid_session(title=query) | |
permission = self.chat_session.get_user_preferences() | |
websearch_enabled = permission.get('websearch', False) | |
env_recommendations = permission.get('environmental_recommendations', False) | |
personalized_recommendations = permission.get('personalized_recommendations', False) | |
keywords_permission = permission.get('keywords', False) | |
reference_permission = permission.get('references', False) | |
language = self.chat_session.get_language().lower() | |
language_prompt = LANGUAGE_RESPONSE_PROMPT.format(language=language) | |
if websearch_enabled: | |
with ThreadPoolExecutor(max_workers=2) as executor: | |
future_web = executor.submit(self.web_searcher.search, diseases_name) | |
future_images = executor.submit(self.web_searcher.search_images, diseases_name) | |
web_results = future_web.result() | |
image_results = future_images.result() | |
context_parts = [] | |
references = [] | |
for idx, result in enumerate(web_results, 1): | |
if result['text']: | |
context_parts.append(f"From Source {idx}: {result['text']}\n") | |
references.append(result['link']) | |
context = "\n".join(context_parts) | |
if env_recommendations and personalized_recommendations: | |
prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
user_details=self.chat_session.get_personalized_recommendation(), | |
environmental_condition=self.environment_data.get_environmental_data(), | |
previous_history="", | |
context=context, | |
current_query=query | |
) | |
elif personalized_recommendations: | |
prompt = PERSONALIZED_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
user_details=self.chat_session.get_personalized_recommendation(), | |
previous_history="", | |
context=context, | |
current_query=query | |
) | |
elif env_recommendations: | |
prompt = ENVIRONMENTAL_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
environmental_condition=self.environment_data.get_environmental_data(), | |
previous_history="", | |
context=context, | |
current_query=query | |
) | |
else: | |
prompt = DEFAULT_PROMPT.format( | |
previous_history="", | |
context=context, | |
current_query=query | |
) | |
prompt = prompt + f"\the query is related to {diseases_name}" + language_prompt | |
llm_response = Model().llm(prompt, query) | |
response = response5.format( | |
diseases_name=diseases_name, | |
diseases_detection_confidence=diseases_detection_confidence, | |
response=llm_response | |
) | |
keywords = "" | |
if keywords_permission: | |
keywords = self.extract_keywords_yake(response, language=language) | |
if not reference_permission: | |
references = "" | |
chat_data = { | |
"query": query, | |
"response": response, | |
"references": references, | |
"page_no": filename, | |
"keywords": keywords, | |
"images": image_results, | |
"context": context, | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.chat_session.session_id | |
} | |
if not self.chat_session.save_chat(chat_data): | |
raise ValueError("Failed to save chat message") | |
return chat_data | |
else: | |
attach_image = False | |
with ThreadPoolExecutor(max_workers=2) as executor: | |
future_images = executor.submit(self.web_searcher.search_images, diseases_name) | |
image_results = future_images.result() | |
results = self.vectordb.search(diseases_name , top_k= 3) | |
context_parts = [] | |
references = [] | |
seen_pages = set() | |
for result in results: | |
confidence = result['confidence'] | |
if confidence > 60: | |
context_parts.append(f"Content: {result['content']}") | |
page = result['page'] | |
if page not in seen_pages: | |
references.append(f"Source: {result['source']}, Page: {page}") | |
seen_pages.add(page) | |
attach_image = True | |
context = "\n".join(context_parts) | |
if not context or len(context) < 10: | |
context = "There is no context found unfortunately please do not answer anything and ignore previous information or recommendations that were mentioned earlier in the context." | |
if env_recommendations and personalized_recommendations: | |
prompt = ENVIRONMENTAL_PERSONALIZED_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
user_details=self.chat_session.get_personalized_recommendation(), | |
environmental_condition=self.environment_data.get_environmental_data(), | |
previous_history="", | |
context=context, | |
current_query=query | |
) | |
elif personalized_recommendations: | |
prompt = PERSONALIZED_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
user_details=self.chat_session.get_personalized_recommendation(), | |
previous_history="", | |
context=context, | |
current_query=query | |
) | |
elif env_recommendations: | |
prompt = ENVIRONMENTAL_PROMPT.format( | |
user_name=name, | |
user_age=age, | |
environmental_condition=self.environment_data.get_environmental_data(), | |
previous_history=history, | |
context=context, | |
current_query=query | |
) | |
else: | |
prompt = DEFAULT_PROMPT.format( | |
previous_history="", | |
context=context, | |
current_query=query | |
) | |
prompt = prompt + f"\the query is related to {diseases_name}" + language_prompt | |
llm_response = Model().llm(prompt, query) | |
response = response5.format( | |
diseases_name=diseases_name, | |
diseases_detection_confidence=diseases_detection_confidence, | |
response=llm_response | |
) | |
keywords = "" | |
if keywords_permission: | |
keywords = self.extract_keywords_yake(response, language=language) | |
if not reference_permission: | |
references = "" | |
if not attach_image: | |
image_results = "" | |
keywords = "" | |
chat_data = { | |
"query": query, | |
"response": response, | |
"references": references, | |
"page_no": filename, | |
"keywords": keywords, | |
"images": image_results, | |
"context": context, | |
"timestamp": datetime.now(timezone.utc).isoformat(), | |
"session_id": self.chat_session.session_id | |
} | |
if not self.chat_session.save_chat(chat_data): | |
raise ValueError("Failed to save chat message") | |
return chat_data | |
except Exception as e: | |
return { | |
"error": str(e), | |
"query": query, | |
"response": "Sorry, there was an error processing your request.", | |
"timestamp": datetime.now(timezone.utc).isoformat() | |
} | |
def web_search(self, query: str) -> Dict[str, Any]: | |
if self.session_id and len(self.session_id) > 5: | |
return self.process_chat(query=query) | |
else: | |
return self.process_chat(query=query) |