Spaces:
Running
Running
import os | |
import gradio as gr | |
import fitz # PyMuPDF | |
import faiss | |
import numpy as np | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer | |
from auto_gptq import AutoGPTQForCausalLM | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from huggingface_hub import login | |
# Authenticate | |
hf_token = os.environ.get("HUGGINGFACE_TOKEN") | |
if not hf_token: | |
raise ValueError("Hugging Face token not found.") | |
login(token=hf_token) | |
# Load embedding model | |
embed_model = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
# Load 4-bit quantized Mistral model | |
model_id = "TheBloke/Mistral-7B-Instruct-v0.1-GPTQ" | |
tokenizer = AutoTokenizer.from_pretrained(model_id, use_fast=True) | |
model = AutoGPTQForCausalLM.from_quantized( | |
model_id, | |
use_safetensors=True, | |
trust_remote_code=True, | |
device_map="auto" | |
) | |
# Internal state | |
index = None | |
doc_texts = [] | |
# PDF/TXT text extraction | |
def extract_text(file): | |
try: | |
text = "" | |
file_path = file.name if hasattr(file, 'name') else file | |
if file_path.endswith(".pdf"): | |
with fitz.open(file_path) as doc: | |
for page in doc: | |
text += page.get_text() | |
elif file_path.endswith(".txt"): | |
with open(file_path, "r", encoding="utf-8") as f: | |
text = f.read() | |
else: | |
return "β Unsupported file type." | |
return text | |
except Exception as e: | |
return f"β Error extracting text: {e}" | |
# Preprocess and embed | |
def process_file(file): | |
global index, doc_texts | |
try: | |
text = extract_text(file) | |
if text.startswith("β"): | |
return text | |
text = text[:15000] # Limit size | |
splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50) | |
doc_texts = splitter.split_text(text) | |
if not doc_texts: | |
return "β Document could not be split." | |
embeddings = embed_model.encode(doc_texts, convert_to_numpy=True) | |
dim = embeddings.shape[1] | |
index = faiss.IndexFlatL2(dim) | |
index.add(embeddings) | |
return "β Document processed. Ask your question below." | |
except Exception as e: | |
return f"β Error processing file: {e}" | |
# Generate answer using context | |
def generate_answer(question): | |
global index, doc_texts | |
try: | |
if index is None or not doc_texts: | |
return "β οΈ Please upload and process a document first." | |
question_emb = embed_model.encode([question], convert_to_numpy=True) | |
_, I = index.search(question_emb, k=3) | |
context = "\n".join([doc_texts[i] for i in I[0]]) | |
prompt = ( | |
f"You are a helpful assistant. Use the context below to answer clearly.\n\n" | |
f"Context:\n{context}\n\n" | |
f"Question: {question}\n\n" | |
f"Answer:" | |
) | |
inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
output = model.generate( | |
**inputs, | |
max_new_tokens=150, | |
do_sample=True, | |
temperature=0.7, | |
top_k=50, | |
top_p=0.95 | |
) | |
answer = tokenizer.decode(output[0], skip_special_tokens=True) | |
return answer.split("Answer:")[-1].strip() | |
except Exception as e: | |
return f"β Error generating answer: {e}" | |
# Gradio UI | |
with gr.Blocks(title="π Document Q&A (Mistral 4-bit)") as demo: | |
gr.Markdown("<h1 style='text-align: center;'>π Document Q&A with Mistral 4-bit</h1>") | |
gr.Markdown("Upload a PDF or TXT and ask questions. Powered by Mistral-7B GPTQ.") | |
with gr.Row(): | |
file_input = gr.File(label="Upload Document", file_types=[".pdf", ".txt"]) | |
upload_output = gr.Textbox(label="Upload Status") | |
with gr.Row(): | |
question_input = gr.Textbox(label="Ask a Question", placeholder="e.g. What is this document about?") | |
answer_output = gr.Textbox(label="Answer") | |
file_input.change(fn=process_file, inputs=file_input, outputs=upload_output) | |
question_input.submit(fn=generate_answer, inputs=question_input, outputs=answer_output) | |
demo.launch(show_error=True) | |