Spaces:
Running
Running
from __future__ import annotations | |
import os | |
import json | |
import base64 | |
import time | |
import tempfile | |
import re | |
from typing import List, Dict, Any, Optional | |
try: | |
from openai import OpenAI | |
except Exception: | |
OpenAI = None | |
from langchain.schema import Document | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
try: | |
from gtts import gTTS | |
except Exception: | |
gTTS = None | |
from .prompts import ( | |
SYSTEM_TEMPLATE, ANSWER_TEMPLATE_CALM, ANSWER_TEMPLATE_ADQ, | |
SAFETY_GUARDRAILS, RISK_FOOTER, render_emotion_guidelines, | |
NLU_ROUTER_PROMPT, SPECIALIST_CLASSIFIER_PROMPT, | |
ROUTER_PROMPT, | |
ANSWER_TEMPLATE_FACTUAL, | |
ANSWER_TEMPLATE_GENERAL_KNOWLEDGE, | |
ANSWER_TEMPLATE_GENERAL, | |
QUERY_EXPANSION_PROMPT | |
) | |
# ----------------------------- | |
# Multimodal Processing Functions | |
# ----------------------------- | |
def _openai_client() -> Optional[OpenAI]: | |
api_key = os.getenv("OPENAI_API_KEY", "").strip() | |
return OpenAI(api_key=api_key) if api_key and OpenAI else None | |
def describe_image(image_path: str) -> str: | |
client = _openai_client() | |
if not client: | |
return "(Image description failed: OpenAI API key not configured.)" | |
try: | |
extension = os.path.splitext(image_path)[1].lower() | |
mime_type = f"image/{'jpeg' if extension in ['.jpg', '.jpeg'] else extension.strip('.')}" | |
with open(image_path, "rb") as image_file: | |
base64_image = base64.b64encode(image_file.read()).decode('utf-8') | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": "Describe this image concisely for a memory journal. Focus on people, places, and key objects. Example: 'A photo of John and Mary smiling on a bench at the park.'"}, | |
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}} | |
], | |
} | |
], max_tokens=100) | |
return response.choices[0].message.content or "No description available." | |
except Exception as e: | |
return f"[Image description error: {e}]" | |
# ----------------------------- | |
# NLU Classification Function (Dynamic Version) | |
# ----------------------------- | |
def detect_tags_from_query( | |
query: str, | |
nlu_vectorstore: FAISS, | |
behavior_options: list, | |
emotion_options: list, | |
topic_options: list, | |
context_options: list, | |
settings: dict = None | |
) -> Dict[str, Any]: | |
"""Uses a dynamic two-step NLU process: Route -> Retrieve Examples -> Classify.""" | |
# --- STEP 1: Route the query to determine the primary goal --- | |
router_prompt = NLU_ROUTER_PROMPT.format(query=query) | |
primary_goal_raw = call_llm([{"role": "user", "content": router_prompt}], temperature=0.0).strip().lower() | |
# --- FIX START: Use separate variables for the filter (lowercase) and the prompt (Title Case) --- | |
goal_for_filter = "practical_planning" if "practical" in primary_goal_raw else "emotional_support" | |
goal_for_prompt = "Practical Planning" if "practical" in primary_goal_raw else "Emotional Support" | |
# --- FIX END --- | |
if settings and settings.get("debug_mode"): | |
print(f"\n--- NLU Router ---\nGoal: {goal_for_prompt} (Filter: '{goal_for_filter}')\n------------------\n") | |
# --- STEP 2: Retrieve relevant examples from the NLU vector store --- | |
retriever = nlu_vectorstore.as_retriever( | |
search_kwargs={"k": 2, "filter": {"primary_goal": goal_for_filter}} # <-- Use the correct lowercase filter | |
) | |
retrieved_docs = retriever.invoke(query) | |
# Format the retrieved examples for the prompt | |
selected_examples = "\n".join( | |
f"User Query: \"{doc.page_content}\"\n{json.dumps(doc.metadata['classification'], indent=4)}" | |
for doc in retrieved_docs | |
) | |
if not selected_examples: | |
selected_examples = "(No relevant examples found)" | |
if settings and settings.get("debug_mode"): | |
print("WARNING: NLU retriever found no examples for this query.") | |
# --- STEP 3: Use the Specialist Classifier with retrieved examples --- | |
behavior_str = ", ".join(f'"{opt}"' for opt in behavior_options if opt != "None") | |
emotion_str = ", ".join(f'"{opt}"' for opt in emotion_options if opt != "None") | |
topic_str = ", ".join(f'"{opt}"' for opt in topic_options if opt != "None") | |
context_str = ", ".join(f'"{opt}"' for opt in context_options if opt != "None") | |
prompt = SPECIALIST_CLASSIFIER_PROMPT.format( | |
primary_goal=goal_for_prompt, # Use Title Case for the prompt text | |
examples=selected_examples, | |
behavior_options=behavior_str, | |
emotion_options=emotion_str, | |
topic_options=topic_str, | |
context_options=context_str, | |
query=query | |
) | |
messages = [{"role": "system", "content": "You are a helpful NLU classification assistant."}, {"role": "user", "content": prompt}] | |
response_str = call_llm(messages, temperature=0.1) | |
if settings and settings.get("debug_mode"): | |
print(f"\n--- NLU Specialist Full Response ---\n{response_str}\n----------------------------------\n") | |
# --- STEP 4: Parse the final result --- | |
result_dict = {"detected_behaviors": [], "detected_emotion": "None", "detected_topic": "None", "detected_contexts": []} | |
try: | |
start_brace = response_str.find('{') | |
end_brace = response_str.rfind('}') | |
if start_brace != -1 and end_brace > start_brace: | |
json_str = response_str[start_brace : end_brace + 1] | |
result = json.loads(json_str) | |
behaviors = result.get("detected_behaviors") | |
if behaviors: # This checks for both None and empty list | |
result_dict["detected_behaviors"] = [b for b in behaviors if b in behavior_options] | |
# Fix bug to properly handle null values from the LLM and will no longer raise the TypeError. | |
# Use `or` to safely handle None, empty strings, etc. | |
result_dict["detected_emotion"] = result.get("detected_emotion") or "None" | |
result_dict["detected_topic"] = result.get("detected_topic") or "None" | |
contexts = result.get("detected_contexts") | |
if contexts: # This checks for both None and empty list | |
result_dict["detected_contexts"] = [c for c in contexts if c in context_options] | |
# Buggy code that can't handle a NULL case from LLM. | |
# result_dict["detected_behaviors"] = [b for b in result.get("detected_behaviors", []) if b in behavior_options] | |
# result_dict["detected_emotion"] = result.get("detected_emotion", "None") | |
# result_dict["detected_topic"] = result.get("detected_topic", "None") | |
# result_dict["detected_contexts"] = [c for c in result.get("detected_contexts", []) if c in context_options] | |
return result_dict | |
except (json.JSONDecodeError, AttributeError) as e: | |
print(f"ERROR parsing NLU Specialist JSON: {e}") | |
return result_dict | |
# ----------------------------- | |
# Embeddings & VectorStore | |
# ----------------------------- | |
def _default_embeddings(): | |
model_name = os.getenv("EMBEDDINGS_MODEL", "sentence-transformers/all-MiniLM-L6-v2") | |
return HuggingFaceEmbeddings(model_name=model_name) | |
def build_or_load_vectorstore(docs: List[Document], index_path: str, is_personal: bool = False) -> FAISS: | |
os.makedirs(os.path.dirname(index_path), exist_ok=True) | |
if os.path.isdir(index_path) and os.path.exists(os.path.join(index_path, "index.faiss")): | |
try: | |
return FAISS.load_local(index_path, _default_embeddings(), allow_dangerous_deserialization=True) | |
except Exception: pass | |
if is_personal and not docs: | |
docs = [Document(page_content="(This is the start of the personal memory journal.)", metadata={"source": "placeholder"})] | |
vs = FAISS.from_documents(docs, _default_embeddings()) | |
vs.save_local(index_path) | |
return vs | |
def texts_from_jsonl(path: str) -> List[Document]: | |
out: List[Document] = [] | |
try: | |
with open(path, "r", encoding="utf-8") as f: | |
for i, line in enumerate(f): | |
obj = json.loads(line.strip()) | |
txt = obj.get("text") or "" | |
if not txt.strip(): continue | |
md = {"source": os.path.basename(path), "chunk": i} | |
for k in ("behaviors", "emotion", "topic_tags", "context_tags"): | |
if k in obj and obj[k]: md[k] = obj[k] | |
out.append(Document(page_content=txt, metadata=md)) | |
except Exception: return [] | |
return out | |
def bootstrap_vectorstore(sample_paths: List[str] | None = None, index_path: str = "data/faiss_index") -> FAISS: | |
docs: List[Document] = [] | |
for p in (sample_paths or []): | |
try: | |
if p.lower().endswith(".jsonl"): | |
docs.extend(texts_from_jsonl(p)) | |
else: | |
with open(p, "r", encoding="utf-8", errors="ignore") as fh: | |
docs.append(Document(page_content=fh.read(), metadata={"source": os.path.basename(p)})) | |
except Exception: continue | |
if not docs: | |
docs = [Document(page_content="(empty index)", metadata={"source": "placeholder"})] | |
return build_or_load_vectorstore(docs, index_path=index_path) | |
# ----------------------------- | |
# LLM Call | |
# ----------------------------- | |
def call_llm(messages: List[Dict[str, str]], temperature: float = 0.6, stop: Optional[List[str]] = None) -> str: | |
client = _openai_client() | |
model = os.getenv("OPENAI_MODEL", "gpt-4o-mini") | |
if not client: | |
return "(Offline Mode: OpenAI API key not configured.)" | |
try: | |
api_args = {"model": model, "messages": messages, "temperature": float(temperature if temperature is not None else 0.6)} | |
if stop: api_args["stop"] = stop | |
resp = client.chat.completions.create(**api_args) | |
return (resp.choices[0].message.content or "").strip() | |
except Exception as e: | |
return f"[LLM API Error: {e}]" | |
# ----------------------------- | |
# Prompting & RAG Chain | |
# ----------------------------- | |
def make_rag_chain( | |
vs_general: FAISS, | |
vs_personal: FAISS, | |
*, | |
role: str = "patient", | |
temperature: float = 0.6, | |
language: str = "English", | |
patient_name: str = "the patient", | |
caregiver_name: str = "the caregiver", | |
tone: str = "warm", | |
): | |
"""Returns a callable that performs the complete, intelligent RAG process.""" | |
def _format_docs(docs: List[Document], default_msg: str) -> str: | |
if not docs: return default_msg | |
unique_docs = {doc.page_content: doc for doc in docs}.values() | |
return "\n".join([f"- {d.page_content.strip()}" for d in unique_docs]) | |
def _answer_fn(query: str, chat_history: List[Dict[str, str]], scenario_tag: Optional[str] = None, emotion_tag: Optional[str] = None, topic_tag: Optional[str] = None, context_tags: Optional[List[str]] = None) -> Dict[str, Any]: | |
router_messages = [{"role": "user", "content": ROUTER_PROMPT.format(query=query)}] | |
query_type = call_llm(router_messages, temperature=0.0).strip().lower() | |
print(f"Query classified as: {query_type}") | |
system_message = SYSTEM_TEMPLATE.format(tone=tone, language=language, patient_name=patient_name or "the patient", caregiver_name=caregiver_name or "the caregiver", guardrails=SAFETY_GUARDRAILS) | |
messages = [{"role": "system", "content": system_message}] | |
messages.extend(chat_history) | |
if "general_knowledge_question" in query_type: | |
user_prompt = ANSWER_TEMPLATE_GENERAL_KNOWLEDGE.format(question=query, language=language) | |
messages.append({"role": "user", "content": user_prompt}) | |
answer = call_llm(messages, temperature=temperature) | |
return {"answer": answer, "sources": ["General Knowledge"]} | |
elif "factual_question" in query_type: | |
print(f"Performing query expansion for: '{query}'") | |
expansion_prompt = QUERY_EXPANSION_PROMPT.format(question=query) | |
expansion_response = call_llm([{"role": "user", "content": expansion_prompt}], temperature=0.1) | |
try: | |
clean_response = expansion_response.strip().replace("```json", "").replace("```", "") | |
expanded_queries = json.loads(clean_response) | |
search_queries = [query] + expanded_queries | |
except json.JSONDecodeError: | |
search_queries = [query] | |
print(f"Searching with queries: {search_queries}") | |
all_docs = [] | |
for q in search_queries: | |
all_docs.extend(vs_personal.similarity_search(q, k=2)) | |
all_docs.extend(vs_general.similarity_search(q, k=2)) | |
context = _format_docs(all_docs, "(No relevant information found in the memory journal.)") | |
user_prompt = ANSWER_TEMPLATE_FACTUAL.format(context=context, question=query, language=language) | |
messages.append({"role": "user", "content": user_prompt}) | |
answer = call_llm(messages, temperature=temperature) | |
sources = list(set(d.metadata.get("source", "unknown") for d in all_docs)) | |
return {"answer": answer, "sources": sources} | |
elif "general_conversation" in query_type: | |
user_prompt = ANSWER_TEMPLATE_GENERAL.format(question=query, language=language) | |
messages.append({"role": "user", "content": user_prompt}) | |
answer = call_llm(messages, temperature=temperature) | |
return {"answer": answer, "sources": []} | |
else: # Default to the original caregiving logic | |
# --- Reworked search strategy to handle filters correctly --- | |
# 1. Start with a general, unfiltered search to always get text-based matches. | |
personal_docs = vs_personal.similarity_search(query, k=3) | |
general_docs = vs_general.similarity_search(query, k=3) | |
# 2. Build a filter for simple equality checks (FAISS supported). | |
simple_search_filter = {} | |
if scenario_tag and scenario_tag != "None": | |
simple_search_filter["behaviors"] = scenario_tag.lower() | |
if emotion_tag and emotion_tag != "None": | |
simple_search_filter["emotion"] = emotion_tag.lower() | |
if topic_tag and topic_tag != "None": | |
simple_search_filter["topic_tags"] = topic_tag.lower() | |
# 3. If simple filters exist, perform a second, more specific search. | |
if simple_search_filter: | |
print(f"Performing additional search with filter: {simple_search_filter}") | |
personal_docs.extend(vs_personal.similarity_search(query, k=2, filter=simple_search_filter)) | |
general_docs.extend(vs_general.similarity_search(query, k=2, filter=simple_search_filter)) | |
# 4. If context_tags exist (unsupported by 'in'), loop through them and perform separate searches. | |
if context_tags: | |
print(f"Performing looped context tag search for: {context_tags}") | |
for tag in context_tags: | |
context_filter = {"context_tags": tag.lower()} | |
personal_docs.extend(vs_personal.similarity_search(query, k=1, filter=context_filter)) | |
general_docs.extend(vs_general.similarity_search(query, k=1, filter=context_filter)) | |
# 5. Combine and de-duplicate all results. | |
all_docs_care = list({doc.page_content: doc for doc in personal_docs + general_docs}.values()) | |
# --- End of reworked search strategy --- | |
personal_context = _format_docs([d for d in all_docs_care if d in personal_docs], "(No relevant personal memories found.)") | |
general_context = _format_docs([d for d in all_docs_care if d in general_docs], "(No general guidance found.)") | |
first_emotion = None | |
for doc in all_docs_care: | |
if "emotion" in doc.metadata and doc.metadata["emotion"]: | |
emotion_data = doc.metadata["emotion"] | |
if isinstance(emotion_data, list): first_emotion = emotion_data[0] | |
else: first_emotion = emotion_data | |
if first_emotion: break | |
emotions_context = render_emotion_guidelines(first_emotion or emotion_tag) | |
is_tagged_scenario = (scenario_tag and scenario_tag != "None") or (emotion_tag and emotion_tag != "None") or (first_emotion is not None) | |
template = ANSWER_TEMPLATE_ADQ if is_tagged_scenario else ANSWER_TEMPLATE_CALM | |
if template == ANSWER_TEMPLATE_ADQ: | |
user_prompt = template.format(general_context=general_context, personal_context=personal_context, question=query, scenario_tag=scenario_tag, emotions_context=emotions_context, role=role, language=language) | |
else: | |
combined_context = f"General Guidance:\n{general_context}\n\nPersonal Memories:\n{personal_context}" | |
user_prompt = template.format(context=combined_context, question=query, language=language) | |
messages.append({"role": "user", "content": user_prompt}) | |
answer = call_llm(messages, temperature=temperature) | |
high_risk_scenarios = ["exit_seeking", "wandering", "elopement"] | |
if scenario_tag and scenario_tag.lower() in high_risk_scenarios: | |
answer += f"\n\n---\n{RISK_FOOTER}" | |
sources = list(set(d.metadata.get("source", "unknown") for d in all_docs_care)) | |
return {"answer": answer, "sources": sources} | |
return _answer_fn | |
def answer_query(chain, question: str, **kwargs) -> Dict[str, Any]: | |
if not callable(chain): return {"answer": "[Error: RAG chain is not callable]", "sources": []} | |
try: | |
return chain(question, **kwargs) | |
except Exception as e: | |
print(f"ERROR in answer_query: {e}") | |
return {"answer": f"[Error executing chain: {e}]", "sources": []} | |
# ----------------------------- | |
# TTS & Transcription | |
# ----------------------------- | |
def synthesize_tts(text: str, lang: str = "en"): | |
if not text or gTTS is None: return None | |
try: | |
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as fp: | |
tts = gTTS(text=text, lang=(lang or "en")) | |
tts.save(fp.name) | |
return fp.name | |
except Exception: | |
return None | |
def transcribe_audio(filepath: str, lang: str = "en"): | |
client = _openai_client() | |
if not client: return "[Transcription failed: API key not configured]" | |
api_args = {"model": "whisper-1"} | |
if lang and lang != "auto": api_args["language"] = lang | |
with open(filepath, "rb") as audio_file: | |
transcription = client.audio.transcriptions.create(file=audio_file, **api_args) | |
return transcription.text | |