faq-huggingface-model / src /RAGSample.py
brendon-ai's picture
Update src/RAGSample.py
543a618 verified
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_ollama import ChatOllama
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import Runnable
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
import chromadb
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
from typing import Optional, List
import re
import torch
import subprocess
# Load tokenizer and model separately to configure properly
from transformers import AutoTokenizer, AutoModelForCausalLM
# OPTION 1: Use Hugging Face Pipeline (Recommended for HF Spaces)
from transformers import pipeline
from langchain.llms import HuggingFacePipeline
# Disable ChromaDB telemetry to avoid the error
os.environ["ANONYMIZED_TELEMETRY"] = "False"
os.environ["CHROMA_SERVER_HOST"] = "localhost"
os.environ["CHROMA_SERVER_HTTP_PORT"] = "8000"
class ImprovedTFIDFEmbeddings(Embeddings):
"""Improved TF-IDF based embedding function with better preprocessing."""
def __init__(self):
self.vectorizer = TfidfVectorizer(
max_features=5000,
stop_words='english',
ngram_range=(1, 3),
min_df=1,
max_df=0.85,
lowercase=True,
strip_accents='unicode',
analyzer='word'
)
self.fitted = False
self.documents = []
def embed_documents(self, texts):
"""Create embeddings for a list of texts."""
if not self.fitted:
self.documents = texts
self.vectorizer.fit(texts)
self.fitted = True
# Transform texts to TF-IDF vectors
tfidf_matrix = self.vectorizer.transform(texts)
# Convert to dense arrays and normalize
embeddings = []
for i in range(tfidf_matrix.shape[0]):
embedding = tfidf_matrix[i].toarray().flatten()
# Normalize the embedding
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
# Pad or truncate to 512 dimensions
if len(embedding) < 512:
embedding = np.pad(embedding, (0, 512 - len(embedding)))
else:
embedding = embedding[:512]
embeddings.append(embedding.tolist())
return embeddings
def embed_query(self, text):
"""Create embedding for a single query text."""
if not self.fitted:
# If not fitted, fit with just this text
self.vectorizer.fit([text])
self.fitted = True
# Transform query to TF-IDF vector
tfidf_matrix = self.vectorizer.transform([text])
embedding = tfidf_matrix[0].toarray().flatten()
# Normalize the embedding
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
# Pad or truncate to 512 dimensions
if len(embedding) < 512:
embedding = np.pad(embedding, (0, 512 - len(embedding)))
else:
embedding = embedding[:512]
return embedding.tolist()
class SmartFAQRetriever(BaseRetriever):
"""Smart retriever optimized for FAQ datasets with semantic similarity."""
def __init__(self, documents: List[Document], k: int = 4):
super().__init__()
self._documents = documents
self._k = k
self._vectorizer = None # Use private attribute
@property
def documents(self):
return self._documents
@property
def k(self):
return self._k
def get_documents_with_confidence(self, query: str) -> List[dict]:
"""Return top documents and their confidence (similarity) scores."""
results = self._get_relevant_documents_with_scores(query)
return [{"document": doc.page_content, "confidence": round(score, 3)} for doc, score in results]
def _get_relevant_documents_with_scores(self, query: str) -> List[tuple[Document, float]]:
"""Retrieve documents along with similarity scores."""
if not hasattr(self, '_vectorizer') or self._vectorizer is None or not hasattr(self._vectorizer, 'vocabulary_') or not self._vectorizer.vocabulary_:
self._vectorizer = TfidfVectorizer(
max_features=3000,
stop_words='english',
ngram_range=(1, 2),
min_df=1,
max_df=0.9
)
questions = [doc.page_content.split("ANSWER:")[0].replace("QUESTION:", "").strip()
if "QUESTION:" in doc.page_content else doc.page_content
for doc in self._documents]
self._vectorizer.fit(questions)
query_vector = self._vectorizer.transform([query.lower().strip()])
question_texts = [doc.page_content.split("ANSWER:")[0].replace("QUESTION:", "").strip()
if "QUESTION:" in doc.page_content else doc.page_content
for doc in self._documents]
question_vectors = self._vectorizer.transform(question_texts)
similarities = cosine_similarity(query_vector, question_vectors).flatten()
top_indices = similarities.argsort()[-self._k:][::-1]
return [(self._documents[i], float(similarities[i])) for i in top_indices if similarities[i] > 0.1]
def _get_relevant_documents(self, query: str) -> List[Document]:
"""Retrieve documents based on semantic similarity."""
# Ensure vectorizer is fitted
if not hasattr(self, '_vectorizer') or self._vectorizer is None or not hasattr(self._vectorizer, 'vocabulary_') or not self._vectorizer.vocabulary_:
print("[SmartFAQRetriever] Fitting vectorizer...")
self._vectorizer = TfidfVectorizer(
max_features=3000,
stop_words='english',
ngram_range=(1, 2),
min_df=1,
max_df=0.9
)
questions = []
for doc in self._documents:
if "QUESTION:" in doc.page_content:
question_part = doc.page_content.split("ANSWER:")[0]
question = question_part.replace("QUESTION:", "").strip()
questions.append(question)
else:
questions.append(doc.page_content)
self._vectorizer.fit(questions)
query_lower = query.lower().strip()
# Extract questions from documents
questions = []
for doc in self._documents:
if "QUESTION:" in doc.page_content:
question_part = doc.page_content.split("ANSWER:")[0]
question = question_part.replace("QUESTION:", "").strip()
questions.append(question)
else:
questions.append(doc.page_content)
# Transform query and questions to TF-IDF vectors
query_vector = self._vectorizer.transform([query_lower])
question_vectors = self._vectorizer.transform(questions)
# Calculate cosine similarities
similarities = cosine_similarity(query_vector, question_vectors).flatten()
# Get top k documents
top_indices = similarities.argsort()[-self._k:][::-1]
# Return documents with highest similarity scores
relevant_docs = [self._documents[i] for i in top_indices if similarities[i] > 0.1]
if not relevant_docs:
# Fallback to first k documents if no good matches
relevant_docs = self._documents[:self._k]
return relevant_docs
async def _aget_relevant_documents(self, query: str) -> List[Document]:
"""Async version of get_relevant_documents."""
return self._get_relevant_documents(query)
def setup_retriever(use_kaggle_data: bool = False, kaggle_dataset: Optional[str] = None,
kaggle_username: Optional[str] = None, kaggle_key: Optional[str] = None,
use_local_mental_health_data: bool = False) -> BaseRetriever:
"""
Creates a vector store with documents from test data, Kaggle datasets, or local mental health data.
Args:
use_kaggle_data: Whether to load Kaggle data instead of test documents
kaggle_dataset: Kaggle dataset name (e.g., 'username/dataset-name')
kaggle_username: Your Kaggle username (optional if using kaggle.json)
kaggle_key: Your Kaggle API key (optional if using kaggle.json)
use_local_mental_health_data: Whether to load local mental health FAQ data
"""
print("Setting up the retriever...")
if use_local_mental_health_data:
try:
print("Loading mental health FAQ data from local file...")
mental_health_file = "data/Mental_Health_FAQ.csv"
if not os.path.exists(mental_health_file):
print(f"Mental health FAQ file not found: {mental_health_file}")
use_local_mental_health_data = False
else:
# Load mental health FAQ data
df = pd.read_csv(mental_health_file)
documents = []
for _, row in df.iterrows():
question = row['Questions']
answer = row['Answers']
# Create document in FAQ format
content = f"QUESTION: {question}\nANSWER: {answer}"
documents.append(Document(page_content=content))
print(f"Loaded {len(documents)} mental health FAQ documents")
for i, doc in enumerate(documents[:3]):
print(f"Sample FAQ {i+1}: {doc.page_content[:200]}...")
except Exception as e:
print(f"Error loading mental health data: {e}")
use_local_mental_health_data = False
if use_kaggle_data and kaggle_dataset:
try:
from src.kaggle_loader import KaggleDataLoader
print(f"Loading Kaggle dataset: {kaggle_dataset}")
# Create loader without parameters - it will auto-load from kaggle.json
loader = KaggleDataLoader()
# Download the dataset
dataset_path = loader.download_dataset(kaggle_dataset)
# Load documents based on file type - only process files from this specific dataset
documents = []
# Get the dataset name to identify the correct files
dataset_name = kaggle_dataset.split('/')[-1]
print(f"Processing files in dataset directory: {dataset_path}")
for file in os.listdir(dataset_path):
file_path = os.path.join(dataset_path, file)
if file.endswith('.csv'):
print(f"Loading CSV file: {file}")
# For FAQ datasets, use the improved loading method
if 'faq' in file.lower() or 'mental' in file.lower():
documents.extend(loader.load_csv_dataset(file_path, [], chunk_size=50))
else:
# For other CSV files, use first few columns as text
df = pd.read_csv(file_path)
text_columns = df.columns[:3].tolist() # Use first 3 columns
documents.extend(loader.load_csv_dataset(file_path, text_columns, chunk_size=50))
elif file.endswith('.json'):
print(f"Loading JSON file: {file}")
documents.extend(loader.load_json_dataset(file_path))
elif file.endswith('.txt'):
print(f"Loading text file: {file}")
documents.extend(loader.load_text_dataset(file_path))
print(f"Loaded {len(documents)} documents from Kaggle dataset")
for i, doc in enumerate(documents[:3]):
print(f"Sample doc {i+1}: {doc.page_content[:200]}")
except Exception as e:
print(f"Error loading Kaggle data: {e}")
print("Falling back to test documents...")
use_kaggle_data = False
if not use_kaggle_data and not use_local_mental_health_data:
# No test documents - use mental health data as default
print("No specific data source specified, loading mental health FAQ data as default...")
try:
mental_health_file = "data/Mental_Health_FAQ.csv"
if not os.path.exists(mental_health_file):
raise FileNotFoundError(f"Mental health FAQ file not found: {mental_health_file}")
# Load mental health FAQ data
df = pd.read_csv(mental_health_file)
documents = []
for _, row in df.iterrows():
question = row['Questions']
answer = row['Answers']
# Create document in FAQ format
content = f"QUESTION: {question}\nANSWER: {answer}"
documents.append(Document(page_content=content))
print(f"Loaded {len(documents)} mental health FAQ documents")
for i, doc in enumerate(documents[:3]):
print(f"Sample FAQ {i+1}: {doc.page_content[:200]}...")
except Exception as e:
print(f"Error loading mental health data: {e}")
raise Exception("No valid data source available. Please ensure mental health FAQ data is present or provide Kaggle credentials.")
print("Creating TF-IDF embeddings...")
embeddings = ImprovedTFIDFEmbeddings()
print("Creating ChromaDB vector store...")
client = chromadb.PersistentClient(path="./tmp/chroma_db")
# Clear existing collections to prevent mixing old and new data
try:
collections = client.list_collections()
for collection in collections:
print(f"Deleting existing collection: {collection.name}")
client.delete_collection(collection.name)
except Exception as e:
print(f"Warning: Could not clear existing collections: {e}")
print(f"Processing {len(documents)} documents...")
# Check if this is a FAQ dataset and use smart retriever
if any("QUESTION:" in doc.page_content for doc in documents):
print("Using SmartFAQRetriever for better semantic matching...")
return SmartFAQRetriever(documents, k=4)
else:
# Use vector store for non-FAQ datasets
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embeddings,
client=client
)
print("Retriever setup complete.")
return vectorstore.as_retriever(k=4)
# def setup_rag_chain() -> Runnable:
# """Sets up the RAG chain with a prompt template and an LLM."""
# # Define the prompt template for the LLM
# prompt = PromptTemplate(
# template="""You are an assistant for question-answering tasks.
# Use the following documents to answer the question.
# If you don't know the answer, just say that you don't know.
# Use three sentences maximum and keep the answer concise:
# Question: {question}
# Documents: {documents}
# Answer:
# """,
# input_variables=["question", "documents"],
# )
# # Initialize the LLM with dolphin-llama3:8b model
# # Note: This requires the Ollama server to be running with the specified model
# llm = ChatOllama(
# model="deepseek-ai/DeepSeek-R1-0528-Qwen3-8B",
# temperature=0,
# )
# # Create a chain combining the prompt template and LLM
# return prompt | llm | StrOutputParser()
def setup_rag_chain() -> Runnable:
"""Sets up the RAG chain with a prompt template and an LLM."""
# Define the prompt template for the LLM
prompt = PromptTemplate(
template="""Context: You are a medical information assistant that answers health questions using verified medical documents.
Primary Task: Answer the medical question using ONLY the provided documents.
Instructions:
1. For medical questions: Provide a clear, accurate answer based solely on the document content
2. If documents lack sufficient information: "I don't have enough information in the provided documents to answer this question"
3. For non-medical questions: "I specialize in medical information. Please ask a health-related question."
4. For identity questions: "I am a medical information assistant designed to help answer health-related questions based on verified medical documents."
5. Always use patient-friendly language
6. Keep responses 2-4 sentences maximum
7. For serious symptoms, recommend consulting healthcare professionals
Documents: {documents}
Question: {question}
Medical Answer:""",
input_variables=["question", "documents"],
)
try:
tokenizer = AutoTokenizer.from_pretrained("HuggingFaceTB/SmolLM3-3B")
model = AutoModelForCausalLM.from_pretrained(
"HuggingFaceTB/SmolLM3-3B",
device_map="auto",
torch_dtype=torch.float16
)
# Fix the tokenizer configuration properly
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
print(f"Tokenizer pad_token_id: {tokenizer.pad_token_id}")
print(f"Tokenizer eos_token_id: {tokenizer.eos_token_id}")
# Initialize pipeline with correct token IDs from tokenizer
hf_pipeline = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=50, # Start small for testing
temperature=0.2,
return_full_text=False,
do_sample=True,
# Use actual tokenizer token IDs, not hardcoded values
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
clean_up_tokenization_spaces=True
)
# Test the pipeline with a simple input
test_input = "What is diabetes?"
print(f"Testing pipeline with: {test_input}")
test_result = hf_pipeline(test_input)
print(f"Pipeline test successful: {test_result}")
except Exception as e:
print(f"Error setting up BioGPT: {e}")
print("Falling back to DistilGPT-2...")
# Fallback to a more stable model
hf_pipeline = pipeline(
"text-generation",
model="distilgpt2",
max_new_tokens=50,
temperature=0.2,
return_full_text=False,
do_sample=True,
clean_up_tokenization_spaces=True
)
# Test the fallback pipeline
test_input = "What is diabetes?"
print(f"Testing fallback pipeline with: {test_input}")
test_result = hf_pipeline(test_input)
print(f"Fallback pipeline test successful: {test_result}")
# Wrap it in LangChain
llm = HuggingFacePipeline(pipeline=hf_pipeline)
# Create a chain combining the prompt template and LLM
return prompt | llm | StrOutputParser()
# Also update the RAG application class with better error handling
class RAGApplication:
def __init__(self, retriever: BaseRetriever, rag_chain: Runnable):
self.retriever = retriever
self.rag_chain = rag_chain
# def run(self, question: str) -> str:
# """Runs the RAG pipeline for a given question."""
# try:
# # Input validation
# if not question or not question.strip():
# return "Please provide a valid question."
# question = question.strip()
# print(f"\nProcessing question: '{question}'")
# # Retrieve relevant documents
# documents = self.retriever.invoke(question)
# # Debug: Print retrieved documents
# print(f"DEBUG: Retrieved {len(documents)} documents")
# for i, doc in enumerate(documents):
# print(f"DEBUG: Document {i+1}: {doc.page_content[:200]}...")
# # Extract content from retrieved documents
# doc_texts = "\n\n".join([doc.page_content for doc in documents])
# # Limit the total input length to prevent token overflow
# max_input_length = 500 # Conservative limit
# if len(doc_texts) > max_input_length:
# doc_texts = doc_texts[:max_input_length] + "..."
# print(f"DEBUG: Truncated document text to {max_input_length} characters")
# print(f"DEBUG: Combined document text length: {len(doc_texts)}")
# # Get the answer from the language model
# print("DEBUG: Calling language model...")
# answer = self.rag_chain.invoke({"question": question, "documents": doc_texts})
# print(f"DEBUG: Language model response: {answer}")
# return answer
# except Exception as e:
# print(f"Error in RAG application: {str(e)}")
# import traceback
# traceback.print_exc()
# return f"I apologize, but I encountered an error processing your question: {str(e)}. Please try rephrasing it or ask a different question."
def run(self, question: str) -> str:
try:
if not question.strip():
return "Please provide a valid question."
print(f"\nProcessing question: '{question}'")
if hasattr(self.retriever, "get_documents_with_confidence"):
docs_with_scores = self.retriever.get_documents_with_confidence(question)
documents = [Document(page_content=d["document"]) for d in docs_with_scores]
confidence_info = "\n".join([f"- Score: {d['confidence']}, Snippet: {d['document'][:100]}..." for d in docs_with_scores])
else:
documents = self.retriever.invoke(question)
confidence_info = "Confidence scoring not available."
print(f"Retrieved {len(documents)} documents")
print(confidence_info)
doc_texts = "\n\n".join([doc.page_content for doc in documents])
if len(doc_texts) > 500:
doc_texts = doc_texts[:500] + "..."
answer = self.rag_chain.invoke({"question": question, "documents": doc_texts})
# Append confidence footer
footer = "\n\n(Note: This answer is based on documents with confidence scores. Review full context if critical.)"
return answer.strip() + footer
except Exception as e:
print(f"Error in RAG application: {str(e)}")
import traceback
traceback.print_exc()
return f"I apologize, but I encountered an error processing your question: {str(e)}. Please try rephrasing it or ask a different question."
# Main execution block
if __name__ == "__main__":
load_dotenv()
# 1. Setup the components
retriever = setup_retriever()
rag_chain = setup_rag_chain()
# 2. Initialize the RAG application
rag_application = RAGApplication(retriever, rag_chain)
# 3. Run an example query
question = "What is terminal illness?"
print("\n--- Running RAG Application ---")
print(f"Question: {question}")
answer = rag_application.run(question)
print(f"Answer: {answer}")