Spaces:
Running
Running
import os | |
import json | |
import fitz # PyMuPDF | |
import nltk | |
import chromadb | |
from tqdm import tqdm | |
from nltk.tokenize import sent_tokenize | |
from sentence_transformers import SentenceTransformer, util | |
import numpy as np | |
import torch | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
import pytesseract | |
from PIL import Image | |
import io | |
import gradio as gr | |
# --------------------------- | |
# βοΈ Configuration | |
# --------------------------- | |
pdf_folder = r"./Manuals" # Path relative to the app.py file in the Space | |
output_jsonl_pages = "manual_pages_with_ocr.jsonl" | |
output_jsonl_chunks = "manual_chunks_with_ocr.jsonl" | |
chroma_path = "./chroma_store" | |
collection_name = "manual_chunks" | |
chunk_size = 750 | |
chunk_overlap = 100 | |
MAX_CONTEXT_CHUNKS = 3 # Max chunks to send to the LLM | |
# Hugging Face Model Configuration | |
HF_MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct" | |
# Read HF Token from environment variable for security | |
HF_TOKEN = os.environ.get("HF_TOKEN") # Hugging Face Space secret name | |
# --------------------------- | |
# Ensure NLTK resources are available | |
# --------------------------- | |
try: | |
nltk.data.find('tokenizers/punkt') | |
except nltk.downloader.DownloadError: | |
nltk.download('punkt') | |
except LookupError: | |
nltk.download('punkt') | |
# --------------------------- | |
# π Utility: Read PDF to text (with OCR fallback) | |
# --------------------------- | |
# This combines logic from extract_text_from_pdf and extract_text_from_page | |
def extract_text_from_page_with_ocr(page): | |
text = page.get_text().strip() | |
if text: | |
return text, False # native text found, no OCR needed | |
# If native text is missing, try OCR | |
try: | |
pix = page.get_pixmap(dpi=300) | |
img_data = pix.tobytes("png") | |
img = Image.open(io.BytesIO(img_data)) | |
ocr_text = pytesseract.image_to_string(img).strip() | |
return ocr_text, True | |
except Exception as e: | |
print(f"OCR failed for a page: {e}") | |
return "", False # Return empty and indicate OCR was not used if it fails | |
# --------------------------- | |
# π§Ή Clean up lines (from original notebook) | |
# --------------------------- | |
def clean_text(text): | |
lines = text.splitlines() | |
lines = [line.strip() for line in lines if line.strip()] | |
return "\n".join(lines) | |
# --------------------------- | |
# βοΈ Sentence Tokenizer (from original notebook) | |
# --------------------------- | |
def tokenize_sentences(text): | |
return sent_tokenize(text) | |
# --------------------------- | |
# π¦ Chunk into fixed size blocks (from original notebook) | |
# --------------------------- | |
def split_into_chunks(sentences, max_tokens=750, overlap=100): | |
chunks = [] | |
current_chunk = [] | |
current_len = 0 | |
for sentence in sentences: | |
token_count = len(sentence.split()) | |
# Check if adding the next sentence exceeds max_tokens | |
# If it does, and the current chunk is not empty, save the current chunk | |
if current_len + token_count > max_tokens and current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
# Start the next chunk with the overlap | |
current_chunk = current_chunk[-overlap:] | |
# Recalculate current_len based on the overlap | |
current_len = sum(len(s.split()) for s in current_chunk) | |
# Add the current sentence and update length | |
current_chunk.append(sentence) | |
current_len += token_count | |
# Add the last chunk if it's not empty | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
return chunks | |
# --------------------------- | |
# π§ Extract Metadata from Filename (from original notebook) | |
# --------------------------- | |
def extract_metadata_from_filename(filename): | |
name = filename.lower().replace("_", " ").replace("-", " ") | |
metadata = { | |
"model": "unknown", | |
"doc_type": "unknown", | |
"brand": "life fitness" # Assuming 'life fitness' is constant based on your notebook | |
} | |
if "om" in name or "owner" in name: | |
metadata["doc_type"] = "owner manual" | |
elif "sm" in name or "service" in name: | |
metadata["doc_type"] = "service manual" | |
elif "assembly" in name: | |
metadata["doc_type"] = "assembly instructions" | |
elif "alert" in name: | |
metadata["doc_type"] = "installer alert" | |
elif "parts" in name: | |
metadata["doc_type"] = "parts manual" | |
elif "bulletin" in name: | |
metadata["doc_type"] = "service bulletin" | |
known_models = [ | |
"se3hd", "se3", "se4", "symbio", "explore", "integrity x", "integrity sl", | |
"everest", "engage", "inspire", "discover", "95t", "95x", "95c", "95r", "97c" | |
] | |
for model in known_models: | |
# Use regex for more robust matching if needed, but simple 'in' check from notebook | |
if model.replace(" ", "") in name.replace(" ", ""): | |
metadata["model"] = model | |
break | |
return metadata | |
# --------------------------- | |
# π Step 1: Process PDFs, Extract Pages with OCR | |
# --------------------------- | |
def process_pdfs_for_pages(pdf_folder, output_jsonl): | |
print("Starting PDF processing and OCR...") | |
all_pages = [] | |
if not os.path.exists(pdf_folder): | |
print(f"Error: PDF folder not found at {pdf_folder}") | |
return [] # Return empty list if folder doesn't exist | |
pdf_files = [f for f in os.listdir(pdf_folder) if f.lower().endswith(".pdf")] | |
if not pdf_files: | |
print(f"No PDF files found in {pdf_folder}") | |
return [] | |
for pdf_file in tqdm(pdf_files, desc="Scanning PDFs"): | |
path = os.path.join(pdf_folder, pdf_file) | |
try: | |
doc = fitz.open(path) | |
for page_num, page in enumerate(doc, start=1): | |
text, used_ocr = extract_text_from_page_with_ocr(page) | |
if text: # Only save pages with extracted text | |
all_pages.append({ | |
"source_file": pdf_file, | |
"page": page_num, | |
"text": text, | |
"ocr_used": used_ocr | |
}) | |
doc.close() # Close the document | |
except Exception as e: | |
print(f"Error processing {pdf_file}: {e}") | |
continue # Skip to the next file | |
with open(output_jsonl, "w", encoding="utf-8") as f: | |
for page in all_pages: | |
json.dump(page, f) | |
f.write("\n") | |
print(f"β Saved {len(all_pages)} pages to {output_jsonl} (with OCR fallback)") | |
return all_pages # Return the list of pages | |
# --------------------------- | |
# π Step 2: Chunk the Pages | |
# --------------------------- | |
def chunk_pages(input_jsonl, output_jsonl, chunk_size, chunk_overlap): | |
print("Starting page chunking...") | |
all_chunks = [] | |
if not os.path.exists(input_jsonl): | |
print(f"Error: Input JSONL file not found at {input_jsonl}. Run PDF processing first.") | |
return [] | |
try: | |
with open(input_jsonl, "r", encoding="utf-8") as f: | |
# Count lines for tqdm progress bar | |
total_lines = sum(1 for _ in f) | |
f.seek(0) # Reset file pointer to the beginning | |
for line in tqdm(f, total=total_lines, desc="Chunking pages"): | |
try: | |
page = json.loads(line) | |
source_file = page["source_file"] | |
page_number = page["page"] | |
text = page["text"] | |
metadata = extract_metadata_from_filename(source_file) | |
sentences = tokenize_sentences(clean_text(text)) # Clean and tokenize the page text | |
chunks = split_into_chunks(sentences, max_tokens=chunk_size, overlap=chunk_overlap) | |
for i, chunk in enumerate(chunks): | |
# Ensure chunk text is not empty | |
if chunk.strip(): | |
all_chunks.append({ | |
"source_file": source_file, | |
"chunk_id": f"{source_file}::page_{page_number}::chunk_{i+1}", | |
"page": page_number, | |
"ocr_used": page.get("ocr_used", False), # Use .get for safety | |
"model": metadata.get("model", "unknown"), | |
"doc_type": metadata.get("doc_type", "unknown"), | |
"brand": metadata.get("brand", "life fitness"), | |
"text": chunk.strip() # Ensure no leading/trailing whitespace | |
}) | |
except json.JSONDecodeError: | |
print(f"Skipping invalid JSON line: {line}") | |
except Exception as e: | |
print(f"Error processing page from {line}: {e}") | |
continue # Continue with the next line | |
except Exception as e: | |
print(f"Error opening or reading input JSONL file: {e}") | |
return [] | |
if not all_chunks: | |
print("No chunks were created.") | |
with open(output_jsonl, "w", encoding="utf-8") as f: | |
for chunk in all_chunks: | |
json.dump(chunk, f) | |
f.write("\n") | |
print(f"β Done! {len(all_chunks)} chunks saved to {output_jsonl}") | |
return all_chunks # Return the list of chunks | |
# --------------------------- | |
# π Step 3: Embed Chunks into Chroma | |
# --------------------------- | |
def embed_chunks_into_chroma(jsonl_path, chroma_path, collection_name): | |
print("Starting ChromaDB embedding...") | |
try: | |
embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
embedder.eval() | |
print("β SentenceTransformer model loaded.") | |
except Exception as e: | |
print(f"β Error loading SentenceTransformer model: {e}") | |
return None, "Error loading SentenceTransformer model." | |
try: | |
# Use a persistent client | |
client = chromadb.PersistentClient(path=chroma_path) | |
# Check if collection exists and delete if it does to rebuild | |
try: | |
client.get_collection(name=collection_name) | |
client.delete_collection(collection_name) | |
print(f"Deleted existing collection: {collection_name}") | |
except Exception: # Collection does not exist, which is fine | |
pass | |
collection = client.create_collection(name=collection_name) | |
print(f"β ChromaDB collection '{collection_name}' created.") | |
except Exception as e: | |
print(f"β Error initializing ChromaDB: {e}") | |
return None, "Error initializing ChromaDB." | |
texts, metadatas, ids = [], [], [] | |
batch_size = 16 # Define batch size for embedding | |
if not os.path.exists(jsonl_path): | |
print(f"Error: Input JSONL file not found at {jsonl_path}. Run chunking first.") | |
return None, "Input chunk file not found." | |
try: | |
with open(jsonl_path, "r", encoding="utf-8") as f: | |
# Count lines for tqdm progress bar | |
total_lines = sum(1 for _ in f) | |
f.seek(0) # Reset file pointer to the beginning | |
for line in tqdm(f, total=total_lines, desc="Embedding chunks"): | |
try: | |
item = json.loads(line) | |
texts.append(item.get("text", "")) # Use .get for safety | |
ids.append(item.get("chunk_id", f"unknown_{len(ids)}")) # Ensure chunk_id exists | |
# Prepare metadata, ensuring all keys are strings and handling potential missing keys | |
metadata = {str(k): str(v) for k, v in item.items() if k != "text"} | |
metadatas.append(metadata) | |
if len(texts) >= batch_size: | |
embeddings = embedder.encode(texts).tolist() | |
collection.add(documents=texts, metadatas=metadatas, ids=ids, embeddings=embeddings) | |
texts, metadatas, ids = [], [], [] # Reset batches | |
except json.JSONDecodeError: | |
print(f"Skipping invalid JSON line during embedding: {line}") | |
except Exception as e: | |
print(f"Error processing chunk line {line} during embedding: {e}") | |
continue # Continue with the next line | |
# Add any remaining items in the last batch | |
if texts: | |
embeddings = embedder.encode(texts).tolist() | |
collection.add(documents=texts, metadatas=metadatas, ids=ids, embeddings=embeddings) | |
print("β All OCR-enhanced chunks embedded in Chroma!") | |
return collection, None # Return collection and no error | |
except Exception as e: | |
print(f"β Error reading input JSONL file for embedding: {e}") | |
return None, "Error reading input file for embedding." | |
# --------------------------- | |
# π§ Load Hugging Face Model and Tokenizer | |
# --------------------------- | |
# This needs to happen after imports but before the Gradio interface | |
tokenizer = None | |
model = None | |
pipe = None | |
print(f"Attempting to load Hugging Face model: {HF_MODEL_ID}") | |
print(f"Using HF_TOKEN (present: {HF_TOKEN is not None})") | |
if not HF_TOKEN: | |
print("β HF_TOKEN environment variable not set. Cannot load Hugging Face model.") | |
else: | |
try: | |
# Check if CUDA is available | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"Using device: {device}") | |
# Load tokenizer and model | |
tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_ID, token=HF_TOKEN) | |
model = AutoModelForCausalLM.from_pretrained( | |
HF_MODEL_ID, | |
token=HF_TOKEN, | |
torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32, # Use bfloat16 on GPU | |
device_map="auto" if torch.cuda.is_available() else None # Auto device mapping on GPU | |
).to(device) # Move model to selected device | |
# Create a pipeline for easy inference | |
pipe = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_new_tokens=512, | |
temperature=0.1, | |
top_p=0.9, | |
do_sample=True, | |
device=0 if torch.cuda.is_available() else -1 # Specify device for pipeline | |
) | |
print(f"β Successfully loaded Hugging Face model: {HF_MODEL_ID} on {device}") | |
except Exception as e: | |
print(f"β Error loading Hugging Face model: {e}") | |
print("Please ensure:") | |
print("- The HF_TOKEN secret is set in your Hugging Face Space settings.") | |
print("- Your Space has sufficient resources (GPU, RAM) for the model.") | |
print("- You have accepted the model's terms on Hugging Face (if required).") | |
tokenizer, model, pipe = None, None, None # Set to None if loading fails | |
# --------------------------- | |
# π Query Function (Uses Embedder and Chroma) | |
# --------------------------- | |
# Embedder is loaded during the embedding step, need to ensure it's accessible | |
embedder = None # Initialize embedder as None | |
def query_manuals(question, model_filter=None, doc_type_filter=None, top_k=5, rerank_keywords=None): | |
global embedder # Access the global embedder variable | |
if collection is None or embedder is None: | |
print("β οΈ ChromaDB or Embedder not loaded. Cannot perform vector search.") | |
return [] # Return empty if Chroma or Embedder is not loaded | |
where_filter = {} | |
if model_filter: | |
where_filter["model"] = model_filter.lower() | |
if doc_type_filter: | |
where_filter["doc_type"] = doc_type_filter.lower() | |
# ChromaDB query expects a dictionary for 'where' | |
results = collection.query( | |
query_texts=[question], | |
n_results=top_k * 5, # fetch more for reranking | |
where={} if not where_filter else where_filter # Pass empty dict if no filter | |
) | |
if not results or not results.get("documents") or not results["documents"][0]: | |
return [] # No matches | |
try: | |
question_embedding = embedder.encode(question, convert_to_tensor=True) | |
except Exception as e: | |
print(f"Error encoding question: {e}") | |
return [] # Return empty if embedding fails | |
# Step 3: Compute semantic + keyword score | |
reranked = [] | |
# Ensure results["documents"] and results["metadatas"] are not empty before iterating | |
if results.get("documents") and results["documents"][0]: | |
for i, text in enumerate(results["documents"][0]): | |
meta = results["metadatas"][0][i] | |
# Handle potential encoding errors during text embedding | |
try: | |
embedding = embedder.encode(text, convert_to_tensor=True) | |
# Semantic similarity | |
similarity_score = float(util.cos_sim(question_embedding, embedding)) | |
except Exception as e: | |
print(f"Error encoding chunk text for reranking: {e}. Skipping chunk.") | |
continue # Skip this chunk if encoding fails | |
# Keyword score | |
keyword_score = 0 | |
if rerank_keywords and text: # Ensure text is not None or empty | |
for kw in rerank_keywords: | |
if kw.lower() in text.lower(): | |
keyword_score += 1 | |
# Combine with tunable weights | |
# Weights should sum to 1 for a simple weighted average | |
final_score = (0.8 * similarity_score) + (0.2 * keyword_score) | |
reranked.append({ | |
"score": final_score, | |
"text": text, | |
"metadata": meta | |
}) | |
# Sort by combined score | |
reranked.sort(key=lambda x: x["score"], reverse=True) | |
return reranked[:top_k] | |
# --------------------------- | |
# π¬ Ask Hugging Face Model | |
# --------------------------- | |
def ask_hf_model(prompt): | |
if pipe is None: | |
return "Hugging Face model not loaded. Cannot generate response." | |
try: | |
# Use the Llama 3.1 chat template | |
messages = [ | |
{"role": "system", "content": "You are a technical assistant trained to answer questions using equipment manuals. Use only the provided context to answer the question. If the answer is not clearly in the context, reply: 'I don't know.'"}, | |
{"role": "user", "content": prompt} | |
] | |
# Apply chat template and generate text | |
prompt_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
outputs = pipe( | |
prompt_text, | |
do_sample=True, | |
temperature=0.1, # Keep temperature low for more factual answers | |
top_p=0.9, | |
max_new_tokens=512, | |
pad_token_id=tokenizer.eos_token_id # Set pad_token_id for generation | |
) | |
# The output includes the prompt, we need to extract just the generated part | |
# Find the end of the prompt text in the generated output | |
generated_text = outputs[0]["generated_text"] | |
# The chat template adds the assistant's turn token, look for that to find the response start | |
response_start_token = tokenizer.apply_chat_template([{"role": "assistant", "content": ""}], tokenize=False, add_generation_prompt=False) | |
response_start_index = generated_text.find(response_start_token) | |
if response_start_index != -1: | |
response = generated_text[response_start_index + len(response_start_token):].strip() | |
else: | |
# Fallback if the assistant token isn't found | |
response = generated_text.strip() | |
# Remove any trailing EOS tokens or similar artifacts | |
if response.endswith(tokenizer.eos_token): | |
response = response[:-len(tokenizer.eos_token)].strip() | |
return response | |
except Exception as e: | |
return f"β Error generating response from Hugging Face model: {str(e)}" | |
# --------------------------- | |
# π― Full RAG Pipeline | |
# --------------------------- | |
def run_rag_qa(user_question, model_filter=None, doc_type_filter=None): # Added filters as optional inputs | |
# Ensure ChromaDB and the HF model pipeline are loaded before proceeding | |
if collection is None: | |
return "ChromaDB is not loaded. Ensure PDFs are in ./Manuals and the app started correctly." | |
if pipe is None: | |
return "Hugging Face model pipeline is not loaded. Ensure HF_TOKEN is set and the model loaded successfully." | |
results = query_manuals( | |
question=user_question, | |
model_filter=model_filter, # Use the optional filter inputs | |
doc_type_filter=doc_type_filter, | |
top_k=MAX_CONTEXT_CHUNKS, | |
rerank_keywords=["diagnostic", "immobilize", "system", "screen", "service", "error"] # Example keywords | |
) | |
if not results: | |
# Attempt a broader search if initial filter yields no results | |
if model_filter or doc_type_filter: | |
print("No results with specified filters, trying broader search...") | |
results = query_manuals( | |
question=user_question, | |
model_filter=None, # Remove filters for broader search | |
doc_type_filter=None, | |
top_k=MAX_CONTEXT_CHUNKS, | |
rerank_keywords=["diagnostic", "immobilize", "system", "screen", "service", "error"] | |
) | |
if not results: | |
return "No relevant documents found for the query, even with broader search." | |
else: | |
return "No relevant documents found for the query." | |
context = "\n\n".join([f"Source File: {r['metadata'].get('source_file', 'N/A')}, Page: {r['metadata'].get('page', 'N/A')}\nText: {r['text'].strip()}" for r in results]) | |
prompt = f""" | |
Context: | |
{context} | |
Question: {user_question} | |
""" | |
return ask_hf_model(prompt) | |
# --------------------------- | |
# --- Initial Setup --- | |
# This code runs when the app starts on Hugging Face Spaces | |
# It processes PDFs, chunks, and builds the ChromaDB | |
# --------------------------- | |
print("Starting initial setup...") | |
# Ensure Tesseract is available on the system (Hugging Face Spaces usually has it, but this command is good practice) | |
# Using ! in app.py is generally discouraged, better to ensure the environment has it | |
# For HF Spaces, you might need to use a Dockerfile or rely on the default environment. | |
# If Tesseract isn't found, the OCR part might fail. | |
# Process PDFs and extract pages | |
all_pages = process_pdfs_for_pages(pdf_folder, output_jsonl_pages) | |
# Chunk the pages | |
all_chunks = [] | |
if all_pages: # Only chunk if pages were processed | |
all_chunks = chunk_pages(output_jsonl_pages, output_jsonl_chunks, chunk_size, chunk_overlap) | |
# Embed chunks into ChromaDB | |
collection = None # Initialize collection | |
if all_chunks: # Only embed if chunks were created | |
collection, embed_error = embed_chunks_into_chroma(output_jsonl_chunks, chroma_path, collection_name) | |
if embed_error: | |
print(f"Error during embedding: {embed_error}") | |
print("Initial setup complete.") | |
# --------------------------- | |
# π₯οΈ Gradio Interface | |
# --------------------------- | |
# Only define and launch the interface if the necessary components loaded | |
if collection is not None and pipe is not None: | |
with gr.Blocks() as demo: | |
gr.Markdown("""# π§ Manual QA via Hugging Face Llama 3.1 | |
Ask a technical question and get answers using your own PDF manual database and a Hugging Face model. | |
**Note:** Initial startup might take time to process manuals and build the search index. Ensure your `Manuals` folder is uploaded and the `HF_TOKEN` secret is set in Space settings. | |
""") | |
with gr.Row(): | |
question = gr.Textbox(label="Your Question", placeholder="e.g. How do I access diagnostics on the SE3 console?") | |
with gr.Row(): | |
model_filter_input = gr.Textbox(label="Filter by Model (Optional)", placeholder="e.g. se3hd") | |
doc_type_filter_input = gr.Dropdown(label="Filter by Document Type (Optional)", choices=["owner manual", "service manual", "assembly instructions", "installer alert", "parts manual", "service bulletin", "unknown", None], value=None, allow_custom_value=True) | |
submit = gr.Button("π Ask") | |
answer = gr.Textbox(label="Answer", lines=10) # Increased lines for better readability | |
# Call the run_rag_qa function when the button is clicked | |
submit.click( | |
fn=run_rag_qa, | |
inputs=[question, model_filter_input, doc_type_filter_input], | |
outputs=[answer] | |
) | |
# In Hugging Face Spaces, the app is launched automatically. | |
# The demo.launch() call is removed. | |
# demo.launch() | |
else: | |
print("Gradio demo will not launch because RAG components (ChromaDB or HF Model) failed to load during setup.") | |
# You could add a simple Gradio interface here to show an error message | |
# if you wanted to provide user feedback in the Space UI even on failure. | |
# Example: | |
# with gr.Blocks() as error_demo: | |
# gr.Markdown("## Application Failed to Load") | |
# gr.Textbox(label="Error Details", value="RAG components (ChromaDB or HF Model) failed to initialize. Check logs and Space settings (HF_TOKEN, resources).", interactive=False) | |
# error_demo.launch() |