Spaces:
Running
Running
import os | |
import fitz # PyMuPDF | |
import docx | |
import json | |
import gradio as gr | |
import pytesseract | |
from PIL import Image | |
from tqdm import tqdm | |
import chromadb | |
import torch | |
import nltk | |
from sentence_transformers import SentenceTransformer, util | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
# ---------------------------- | |
# β Ensure nltk punkt is available | |
# ---------------------------- | |
try: | |
nltk.data.find("tokenizers/punkt") | |
except LookupError: | |
nltk.download("punkt") | |
from nltk.tokenize import sent_tokenize | |
# ---------------------------- | |
# βοΈ Config | |
# ---------------------------- | |
MANUAL_DIR = "./Manuals" | |
CHROMA_DIR = "./chroma_store" | |
CHUNK_SIZE = 750 | |
CHUNK_OVERLAP = 100 | |
MAX_CONTEXT = 3 | |
DEFAULT_MODEL = "meta-llama/Llama-3-8b-Instruct" | |
MODEL_OPTIONS = [ | |
"meta-llama/Llama-3-8b-Instruct", | |
"mistralai/Mistral-7B-Instruct-v0.3", | |
"google/gemma-1.1-7b-it" | |
] | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
# ---------------------------- | |
# π Utility functions | |
# ---------------------------- | |
def extract_pdf_text(path): | |
text_blocks = [] | |
doc = fitz.open(path) | |
for i, page in enumerate(doc): | |
text = page.get_text() | |
if not text.strip(): | |
img = Image.open(io.BytesIO(page.get_pixmap().tobytes("png"))) | |
text = pytesseract.image_to_string(img) | |
text_blocks.append({"page": i + 1, "text": text}) | |
return text_blocks | |
def extract_docx_text(path): | |
doc = docx.Document(path) | |
full_text = "\n".join([para.text for para in doc.paragraphs]) | |
return [{"page": 1, "text": full_text}] | |
def split_sentences(text): | |
try: | |
return sent_tokenize(text) | |
except Exception: | |
return text.split(". ") | |
def chunk_text(sentences): | |
chunks = [] | |
current = [] | |
count = 0 | |
for sentence in sentences: | |
tokens = sentence.split() | |
if count + len(tokens) > CHUNK_SIZE: | |
chunks.append(" ".join(current)) | |
current = current[-CHUNK_OVERLAP:] | |
count = sum(len(s.split()) for s in current) | |
current.append(sentence) | |
count += len(tokens) | |
if current: | |
chunks.append(" ".join(current)) | |
return chunks | |
def embed_all(): | |
client = chromadb.PersistentClient(path=CHROMA_DIR) | |
if "manual_chunks" in [c.name for c in client.list_collections()]: | |
client.delete_collection("manual_chunks") | |
collection = client.create_collection("manual_chunks") | |
embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
for fname in os.listdir(MANUAL_DIR): | |
fpath = os.path.join(MANUAL_DIR, fname) | |
if fname.lower().endswith(".pdf"): | |
pages = extract_pdf_text(fpath) | |
elif fname.lower().endswith(".docx"): | |
pages = extract_docx_text(fpath) | |
else: | |
continue | |
for page in pages: | |
sents = split_sentences(page["text"]) | |
chunks = chunk_text(sents) | |
for idx, chunk in enumerate(chunks): | |
cid = f"{fname}::p{page['page']}::c{idx}" | |
collection.add(documents=[chunk], ids=[cid], metadatas=[{"source": fname, "page": page["page"]}]) | |
return collection, embedder | |
def get_model(model_id): | |
tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) | |
model = AutoModelForCausalLM.from_pretrained(model_id, token=HF_TOKEN, torch_dtype=torch.float32) | |
return pipeline("text-generation", model=model, tokenizer=tokenizer, device=-1) | |
def run_query(question, model_name): | |
results = db.query(query_texts=[question], n_results=MAX_CONTEXT) | |
if not results or not results.get("documents"): | |
return "No matching information found." | |
context = "\n\n".join(results["documents"][0]) | |
prompt = f""" | |
You are a helpful assistant. Use the following context to answer the question. | |
Context: | |
{context} | |
Question: {question} | |
Answer: | |
""" | |
model = get_model(model_name) | |
res = model(prompt, max_new_tokens=300)[0]['generated_text'] | |
return res.split("Answer:")[-1].strip() | |
# ---------------------------- | |
# β Startup: Embed manuals | |
# ---------------------------- | |
db, embedder = embed_all() | |
# ---------------------------- | |
# ποΈ Gradio UI | |
# ---------------------------- | |
with gr.Blocks() as demo: | |
gr.Markdown(""" | |
# π SmartManuals-AI (Docker) | |
Ask any question from the preloaded manuals (PDF + Word). | |
""") | |
with gr.Row(): | |
question = gr.Textbox(label="Ask a Question") | |
model = gr.Dropdown(choices=MODEL_OPTIONS, value=DEFAULT_MODEL, label="Choose LLM") | |
btn = gr.Button("Ask") | |
answer = gr.Textbox(label="Answer", lines=10) | |
btn.click(fn=run_query, inputs=[question, model], outputs=answer) | |
demo.launch(server_name="0.0.0.0", server_port=7860) | |