Spaces:
Running
Running
File size: 25,086 Bytes
18069c2 c76542a 18069c2 c76542a 18069c2 c76542a 18069c2 c76542a 18069c2 c76542a 18069c2 c76542a 18069c2 c76542a 18069c2 c76542a 18069c2 c76542a 18069c2 c76542a 18069c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 |
import os
import json
import fitz # PyMuPDF
import nltk
import chromadb
from tqdm import tqdm
from nltk.tokenize import sent_tokenize
from sentence_transformers import SentenceTransformer, util
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import pytesseract
from PIL import Image
import io
import gradio as gr
# ---------------------------
# βοΈ Configuration
# ---------------------------
pdf_folder = r"./Manuals" # Path relative to the app.py file in the Space
output_jsonl_pages = "manual_pages_with_ocr.jsonl"
output_jsonl_chunks = "manual_chunks_with_ocr.jsonl"
chroma_path = "./chroma_store"
collection_name = "manual_chunks"
chunk_size = 750
chunk_overlap = 100
MAX_CONTEXT_CHUNKS = 3 # Max chunks to send to the LLM
# Hugging Face Model Configuration
HF_MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct"
# Read HF Token from environment variable for security
HF_TOKEN = os.environ.get("HF_TOKEN") # Hugging Face Space secret name
# ---------------------------
# Ensure NLTK resources are available
# ---------------------------
try:
nltk.data.find('tokenizers/punkt')
except nltk.downloader.DownloadError:
nltk.download('punkt')
except LookupError:
nltk.download('punkt')
# ---------------------------
# π Utility: Read PDF to text (with OCR fallback)
# ---------------------------
# This combines logic from extract_text_from_pdf and extract_text_from_page
def extract_text_from_page_with_ocr(page):
text = page.get_text().strip()
if text:
return text, False # native text found, no OCR needed
# If native text is missing, try OCR
try:
pix = page.get_pixmap(dpi=300)
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data))
ocr_text = pytesseract.image_to_string(img).strip()
return ocr_text, True
except Exception as e:
print(f"OCR failed for a page: {e}")
return "", False # Return empty and indicate OCR was not used if it fails
# ---------------------------
# π§Ή Clean up lines (from original notebook)
# ---------------------------
def clean_text(text):
lines = text.splitlines()
lines = [line.strip() for line in lines if line.strip()]
return "\n".join(lines)
# ---------------------------
# βοΈ Sentence Tokenizer (from original notebook)
# ---------------------------
def tokenize_sentences(text):
return sent_tokenize(text)
# ---------------------------
# π¦ Chunk into fixed size blocks (from original notebook)
# ---------------------------
def split_into_chunks(sentences, max_tokens=750, overlap=100):
chunks = []
current_chunk = []
current_len = 0
for sentence in sentences:
token_count = len(sentence.split())
# Check if adding the next sentence exceeds max_tokens
# If it does, and the current chunk is not empty, save the current chunk
if current_len + token_count > max_tokens and current_chunk:
chunks.append(" ".join(current_chunk))
# Start the next chunk with the overlap
current_chunk = current_chunk[-overlap:]
# Recalculate current_len based on the overlap
current_len = sum(len(s.split()) for s in current_chunk)
# Add the current sentence and update length
current_chunk.append(sentence)
current_len += token_count
# Add the last chunk if it's not empty
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
# ---------------------------
# π§ Extract Metadata from Filename (from original notebook)
# ---------------------------
def extract_metadata_from_filename(filename):
name = filename.lower().replace("_", " ").replace("-", " ")
metadata = {
"model": "unknown",
"doc_type": "unknown",
"brand": "life fitness" # Assuming 'life fitness' is constant based on your notebook
}
if "om" in name or "owner" in name:
metadata["doc_type"] = "owner manual"
elif "sm" in name or "service" in name:
metadata["doc_type"] = "service manual"
elif "assembly" in name:
metadata["doc_type"] = "assembly instructions"
elif "alert" in name:
metadata["doc_type"] = "installer alert"
elif "parts" in name:
metadata["doc_type"] = "parts manual"
elif "bulletin" in name:
metadata["doc_type"] = "service bulletin"
known_models = [
"se3hd", "se3", "se4", "symbio", "explore", "integrity x", "integrity sl",
"everest", "engage", "inspire", "discover", "95t", "95x", "95c", "95r", "97c"
]
for model in known_models:
# Use regex for more robust matching if needed, but simple 'in' check from notebook
if model.replace(" ", "") in name.replace(" ", ""):
metadata["model"] = model
break
return metadata
# ---------------------------
# π Step 1: Process PDFs, Extract Pages with OCR
# ---------------------------
def process_pdfs_for_pages(pdf_folder, output_jsonl):
print("Starting PDF processing and OCR...")
all_pages = []
if not os.path.exists(pdf_folder):
print(f"Error: PDF folder not found at {pdf_folder}")
return [] # Return empty list if folder doesn't exist
pdf_files = [f for f in os.listdir(pdf_folder) if f.lower().endswith(".pdf")]
if not pdf_files:
print(f"No PDF files found in {pdf_folder}")
return []
for pdf_file in tqdm(pdf_files, desc="Scanning PDFs"):
path = os.path.join(pdf_folder, pdf_file)
try:
doc = fitz.open(path)
for page_num, page in enumerate(doc, start=1):
text, used_ocr = extract_text_from_page_with_ocr(page)
if text: # Only save pages with extracted text
all_pages.append({
"source_file": pdf_file,
"page": page_num,
"text": text,
"ocr_used": used_ocr
})
doc.close() # Close the document
except Exception as e:
print(f"Error processing {pdf_file}: {e}")
continue # Skip to the next file
with open(output_jsonl, "w", encoding="utf-8") as f:
for page in all_pages:
json.dump(page, f)
f.write("\n")
print(f"β
Saved {len(all_pages)} pages to {output_jsonl} (with OCR fallback)")
return all_pages # Return the list of pages
# ---------------------------
# π Step 2: Chunk the Pages
# ---------------------------
def chunk_pages(input_jsonl, output_jsonl, chunk_size, chunk_overlap):
print("Starting page chunking...")
all_chunks = []
if not os.path.exists(input_jsonl):
print(f"Error: Input JSONL file not found at {input_jsonl}. Run PDF processing first.")
return []
try:
with open(input_jsonl, "r", encoding="utf-8") as f:
# Count lines for tqdm progress bar
total_lines = sum(1 for _ in f)
f.seek(0) # Reset file pointer to the beginning
for line in tqdm(f, total=total_lines, desc="Chunking pages"):
try:
page = json.loads(line)
source_file = page["source_file"]
page_number = page["page"]
text = page["text"]
metadata = extract_metadata_from_filename(source_file)
sentences = tokenize_sentences(clean_text(text)) # Clean and tokenize the page text
chunks = split_into_chunks(sentences, max_tokens=chunk_size, overlap=chunk_overlap)
for i, chunk in enumerate(chunks):
# Ensure chunk text is not empty
if chunk.strip():
all_chunks.append({
"source_file": source_file,
"chunk_id": f"{source_file}::page_{page_number}::chunk_{i+1}",
"page": page_number,
"ocr_used": page.get("ocr_used", False), # Use .get for safety
"model": metadata.get("model", "unknown"),
"doc_type": metadata.get("doc_type", "unknown"),
"brand": metadata.get("brand", "life fitness"),
"text": chunk.strip() # Ensure no leading/trailing whitespace
})
except json.JSONDecodeError:
print(f"Skipping invalid JSON line: {line}")
except Exception as e:
print(f"Error processing page from {line}: {e}")
continue # Continue with the next line
except Exception as e:
print(f"Error opening or reading input JSONL file: {e}")
return []
if not all_chunks:
print("No chunks were created.")
with open(output_jsonl, "w", encoding="utf-8") as f:
for chunk in all_chunks:
json.dump(chunk, f)
f.write("\n")
print(f"β
Done! {len(all_chunks)} chunks saved to {output_jsonl}")
return all_chunks # Return the list of chunks
# ---------------------------
# π Step 3: Embed Chunks into Chroma
# ---------------------------
def embed_chunks_into_chroma(jsonl_path, chroma_path, collection_name):
print("Starting ChromaDB embedding...")
try:
embedder = SentenceTransformer("all-MiniLM-L6-v2")
embedder.eval()
print("β
SentenceTransformer model loaded.")
except Exception as e:
print(f"β Error loading SentenceTransformer model: {e}")
return None, "Error loading SentenceTransformer model."
try:
# Use a persistent client
client = chromadb.PersistentClient(path=chroma_path)
# Check if collection exists and delete if it does to rebuild
try:
client.get_collection(name=collection_name)
client.delete_collection(collection_name)
print(f"Deleted existing collection: {collection_name}")
except Exception: # Collection does not exist, which is fine
pass
collection = client.create_collection(name=collection_name)
print(f"β
ChromaDB collection '{collection_name}' created.")
except Exception as e:
print(f"β Error initializing ChromaDB: {e}")
return None, "Error initializing ChromaDB."
texts, metadatas, ids = [], [], []
batch_size = 16 # Define batch size for embedding
if not os.path.exists(jsonl_path):
print(f"Error: Input JSONL file not found at {jsonl_path}. Run chunking first.")
return None, "Input chunk file not found."
try:
with open(jsonl_path, "r", encoding="utf-8") as f:
# Count lines for tqdm progress bar
total_lines = sum(1 for _ in f)
f.seek(0) # Reset file pointer to the beginning
for line in tqdm(f, total=total_lines, desc="Embedding chunks"):
try:
item = json.loads(line)
texts.append(item.get("text", "")) # Use .get for safety
ids.append(item.get("chunk_id", f"unknown_{len(ids)}")) # Ensure chunk_id exists
# Prepare metadata, ensuring all keys are strings and handling potential missing keys
metadata = {str(k): str(v) for k, v in item.items() if k != "text"}
metadatas.append(metadata)
if len(texts) >= batch_size:
embeddings = embedder.encode(texts).tolist()
collection.add(documents=texts, metadatas=metadatas, ids=ids, embeddings=embeddings)
texts, metadatas, ids = [], [], [] # Reset batches
except json.JSONDecodeError:
print(f"Skipping invalid JSON line during embedding: {line}")
except Exception as e:
print(f"Error processing chunk line {line} during embedding: {e}")
continue # Continue with the next line
# Add any remaining items in the last batch
if texts:
embeddings = embedder.encode(texts).tolist()
collection.add(documents=texts, metadatas=metadatas, ids=ids, embeddings=embeddings)
print("β
All OCR-enhanced chunks embedded in Chroma!")
return collection, None # Return collection and no error
except Exception as e:
print(f"β Error reading input JSONL file for embedding: {e}")
return None, "Error reading input file for embedding."
# ---------------------------
# π§ Load Hugging Face Model and Tokenizer
# ---------------------------
# This needs to happen after imports but before the Gradio interface
tokenizer = None
model = None
pipe = None
print(f"Attempting to load Hugging Face model: {HF_MODEL_ID}")
print(f"Using HF_TOKEN (present: {HF_TOKEN is not None})")
if not HF_TOKEN:
print("β HF_TOKEN environment variable not set. Cannot load Hugging Face model.")
else:
try:
# Check if CUDA is available
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
# Load tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_ID, token=HF_TOKEN)
model = AutoModelForCausalLM.from_pretrained(
HF_MODEL_ID,
token=HF_TOKEN,
torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32, # Use bfloat16 on GPU
device_map="auto" if torch.cuda.is_available() else None # Auto device mapping on GPU
).to(device) # Move model to selected device
# Create a pipeline for easy inference
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=512,
temperature=0.1,
top_p=0.9,
do_sample=True,
device=0 if torch.cuda.is_available() else -1 # Specify device for pipeline
)
print(f"β
Successfully loaded Hugging Face model: {HF_MODEL_ID} on {device}")
except Exception as e:
print(f"β Error loading Hugging Face model: {e}")
print("Please ensure:")
print("- The HF_TOKEN secret is set in your Hugging Face Space settings.")
print("- Your Space has sufficient resources (GPU, RAM) for the model.")
print("- You have accepted the model's terms on Hugging Face (if required).")
tokenizer, model, pipe = None, None, None # Set to None if loading fails
# ---------------------------
# π Query Function (Uses Embedder and Chroma)
# ---------------------------
# Embedder is loaded during the embedding step, need to ensure it's accessible
embedder = None # Initialize embedder as None
def query_manuals(question, model_filter=None, doc_type_filter=None, top_k=5, rerank_keywords=None):
global embedder # Access the global embedder variable
if collection is None or embedder is None:
print("β οΈ ChromaDB or Embedder not loaded. Cannot perform vector search.")
return [] # Return empty if Chroma or Embedder is not loaded
where_filter = {}
if model_filter:
where_filter["model"] = model_filter.lower()
if doc_type_filter:
where_filter["doc_type"] = doc_type_filter.lower()
# ChromaDB query expects a dictionary for 'where'
results = collection.query(
query_texts=[question],
n_results=top_k * 5, # fetch more for reranking
where={} if not where_filter else where_filter # Pass empty dict if no filter
)
if not results or not results.get("documents") or not results["documents"][0]:
return [] # No matches
try:
question_embedding = embedder.encode(question, convert_to_tensor=True)
except Exception as e:
print(f"Error encoding question: {e}")
return [] # Return empty if embedding fails
# Step 3: Compute semantic + keyword score
reranked = []
# Ensure results["documents"] and results["metadatas"] are not empty before iterating
if results.get("documents") and results["documents"][0]:
for i, text in enumerate(results["documents"][0]):
meta = results["metadatas"][0][i]
# Handle potential encoding errors during text embedding
try:
embedding = embedder.encode(text, convert_to_tensor=True)
# Semantic similarity
similarity_score = float(util.cos_sim(question_embedding, embedding))
except Exception as e:
print(f"Error encoding chunk text for reranking: {e}. Skipping chunk.")
continue # Skip this chunk if encoding fails
# Keyword score
keyword_score = 0
if rerank_keywords and text: # Ensure text is not None or empty
for kw in rerank_keywords:
if kw.lower() in text.lower():
keyword_score += 1
# Combine with tunable weights
# Weights should sum to 1 for a simple weighted average
final_score = (0.8 * similarity_score) + (0.2 * keyword_score)
reranked.append({
"score": final_score,
"text": text,
"metadata": meta
})
# Sort by combined score
reranked.sort(key=lambda x: x["score"], reverse=True)
return reranked[:top_k]
# ---------------------------
# π¬ Ask Hugging Face Model
# ---------------------------
def ask_hf_model(prompt):
if pipe is None:
return "Hugging Face model not loaded. Cannot generate response."
try:
# Use the Llama 3.1 chat template
messages = [
{"role": "system", "content": "You are a technical assistant trained to answer questions using equipment manuals. Use only the provided context to answer the question. If the answer is not clearly in the context, reply: 'I don't know.'"},
{"role": "user", "content": prompt}
]
# Apply chat template and generate text
prompt_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
outputs = pipe(
prompt_text,
do_sample=True,
temperature=0.1, # Keep temperature low for more factual answers
top_p=0.9,
max_new_tokens=512,
pad_token_id=tokenizer.eos_token_id # Set pad_token_id for generation
)
# The output includes the prompt, we need to extract just the generated part
# Find the end of the prompt text in the generated output
generated_text = outputs[0]["generated_text"]
# The chat template adds the assistant's turn token, look for that to find the response start
response_start_token = tokenizer.apply_chat_template([{"role": "assistant", "content": ""}], tokenize=False, add_generation_prompt=False)
response_start_index = generated_text.find(response_start_token)
if response_start_index != -1:
response = generated_text[response_start_index + len(response_start_token):].strip()
else:
# Fallback if the assistant token isn't found
response = generated_text.strip()
# Remove any trailing EOS tokens or similar artifacts
if response.endswith(tokenizer.eos_token):
response = response[:-len(tokenizer.eos_token)].strip()
return response
except Exception as e:
return f"β Error generating response from Hugging Face model: {str(e)}"
# ---------------------------
# π― Full RAG Pipeline
# ---------------------------
def run_rag_qa(user_question, model_filter=None, doc_type_filter=None): # Added filters as optional inputs
# Ensure ChromaDB and the HF model pipeline are loaded before proceeding
if collection is None:
return "ChromaDB is not loaded. Ensure PDFs are in ./Manuals and the app started correctly."
if pipe is None:
return "Hugging Face model pipeline is not loaded. Ensure HF_TOKEN is set and the model loaded successfully."
results = query_manuals(
question=user_question,
model_filter=model_filter, # Use the optional filter inputs
doc_type_filter=doc_type_filter,
top_k=MAX_CONTEXT_CHUNKS,
rerank_keywords=["diagnostic", "immobilize", "system", "screen", "service", "error"] # Example keywords
)
if not results:
# Attempt a broader search if initial filter yields no results
if model_filter or doc_type_filter:
print("No results with specified filters, trying broader search...")
results = query_manuals(
question=user_question,
model_filter=None, # Remove filters for broader search
doc_type_filter=None,
top_k=MAX_CONTEXT_CHUNKS,
rerank_keywords=["diagnostic", "immobilize", "system", "screen", "service", "error"]
)
if not results:
return "No relevant documents found for the query, even with broader search."
else:
return "No relevant documents found for the query."
context = "\n\n".join([f"Source File: {r['metadata'].get('source_file', 'N/A')}, Page: {r['metadata'].get('page', 'N/A')}\nText: {r['text'].strip()}" for r in results])
prompt = f"""
Context:
{context}
Question: {user_question}
"""
return ask_hf_model(prompt)
# ---------------------------
# --- Initial Setup ---
# This code runs when the app starts on Hugging Face Spaces
# It processes PDFs, chunks, and builds the ChromaDB
# ---------------------------
print("Starting initial setup...")
# Ensure Tesseract is available on the system (Hugging Face Spaces usually has it, but this command is good practice)
# Using ! in app.py is generally discouraged, better to ensure the environment has it
# For HF Spaces, you might need to use a Dockerfile or rely on the default environment.
# If Tesseract isn't found, the OCR part might fail.
# Process PDFs and extract pages
all_pages = process_pdfs_for_pages(pdf_folder, output_jsonl_pages)
# Chunk the pages
all_chunks = []
if all_pages: # Only chunk if pages were processed
all_chunks = chunk_pages(output_jsonl_pages, output_jsonl_chunks, chunk_size, chunk_overlap)
# Embed chunks into ChromaDB
collection = None # Initialize collection
if all_chunks: # Only embed if chunks were created
collection, embed_error = embed_chunks_into_chroma(output_jsonl_chunks, chroma_path, collection_name)
if embed_error:
print(f"Error during embedding: {embed_error}")
print("Initial setup complete.")
# ---------------------------
# π₯οΈ Gradio Interface
# ---------------------------
# Only define and launch the interface if the necessary components loaded
if collection is not None and pipe is not None:
with gr.Blocks() as demo:
gr.Markdown("""# π§ Manual QA via Hugging Face Llama 3.1
Ask a technical question and get answers using your own PDF manual database and a Hugging Face model.
**Note:** Initial startup might take time to process manuals and build the search index. Ensure your `Manuals` folder is uploaded and the `HF_TOKEN` secret is set in Space settings.
""")
with gr.Row():
question = gr.Textbox(label="Your Question", placeholder="e.g. How do I access diagnostics on the SE3 console?")
with gr.Row():
model_filter_input = gr.Textbox(label="Filter by Model (Optional)", placeholder="e.g. se3hd")
doc_type_filter_input = gr.Dropdown(label="Filter by Document Type (Optional)", choices=["owner manual", "service manual", "assembly instructions", "installer alert", "parts manual", "service bulletin", "unknown", None], value=None, allow_custom_value=True)
submit = gr.Button("π Ask")
answer = gr.Textbox(label="Answer", lines=10) # Increased lines for better readability
# Call the run_rag_qa function when the button is clicked
submit.click(
fn=run_rag_qa,
inputs=[question, model_filter_input, doc_type_filter_input],
outputs=[answer]
)
# In Hugging Face Spaces, the app is launched automatically.
# The demo.launch() call is removed.
# demo.launch()
else:
print("Gradio demo will not launch because RAG components (ChromaDB or HF Model) failed to load during setup.")
# You could add a simple Gradio interface here to show an error message
# if you wanted to provide user feedback in the Space UI even on failure.
# Example:
# with gr.Blocks() as error_demo:
# gr.Markdown("## Application Failed to Load")
# gr.Textbox(label="Error Details", value="RAG components (ChromaDB or HF Model) failed to initialize. Check logs and Space settings (HF_TOKEN, resources).", interactive=False)
# error_demo.launch() |