Spaces:
Running
Running
import os | |
import gradio as gr | |
import fitz | |
import faiss | |
import numpy as np | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer, pipeline | |
from transformers import BitsAndBytesConfig, AutoModelForCausalLM | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from huggingface_hub import login | |
hf_token = os.environ.get("HUGGINGFACE_TOKEN") | |
if not hf_token: | |
raise ValueError("Hugging Face token not found.") | |
login(token=hf_token) | |
# Load embedding model | |
embed_model = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
# Load quantized Mistral with 8-bit | |
model_id = "mistralai/Mistral-7B-Instruct-v0.1" | |
bnb_config = BitsAndBytesConfig( | |
load_in_8bit=True, | |
llm_int8_threshold=6.0, | |
llm_int8_skip_modules=None, | |
) | |
tokenizer = AutoTokenizer.from_pretrained(model_id, token=hf_token) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
quantization_config=bnb_config, | |
device_map="auto", | |
token=hf_token | |
) | |
llm = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
# State | |
index = None | |
doc_texts = [] | |
# Extract text | |
def extract_text(file): | |
try: | |
text = "" | |
file_path = file.name if hasattr(file, 'name') else file | |
if file_path.endswith(".pdf"): | |
with fitz.open(file_path) as doc: | |
for page in doc: | |
text += page.get_text() | |
elif file_path.endswith(".txt"): | |
with open(file_path, "r", encoding="utf-8") as f: | |
text = f.read() | |
else: | |
return "β Unsupported file type." | |
return text | |
except Exception as e: | |
return f"β Error extracting text: {e}" | |
# Process file | |
def process_file(file): | |
global index, doc_texts | |
try: | |
text = extract_text(file) | |
if text.startswith("β"): | |
return text | |
# Trim large documents | |
text = text[:15000] | |
splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50) | |
doc_texts = splitter.split_text(text) | |
if not doc_texts: | |
return "β Could not split document." | |
embeddings = embed_model.encode(doc_texts, convert_to_numpy=True) | |
dim = embeddings.shape[1] | |
index = faiss.IndexFlatL2(dim) | |
index.add(embeddings) | |
return "β Document processed. You may ask your question below." | |
except Exception as e: | |
return f"β Error processing file: {e}" | |
# Answer generator | |
def generate_answer(question): | |
global index, doc_texts | |
try: | |
if index is None or not doc_texts: | |
return "β οΈ Please upload and process a document first." | |
question_emb = embed_model.encode([question], convert_to_numpy=True) | |
_, I = index.search(question_emb, k=3) | |
context = "\n".join([doc_texts[i] for i in I[0]]) | |
prompt = ( | |
f"You are a helpful assistant. Use the context below to answer the question clearly.\n\n" | |
f"Context:\n{context}\n\n" | |
f"Question: {question}\n\n" | |
f"Answer:" | |
) | |
result = llm(prompt, max_new_tokens=200, do_sample=True, temperature=0.7) | |
return result[0]["generated_text"].split("Answer:")[-1].strip() | |
except Exception as e: | |
return f"β Error generating answer: {e}" | |
# Gradio UI | |
with gr.Blocks(title="π Document Q&A Assistant") as demo: | |
gr.Markdown("<h1 style='text-align: center;'>π Document AI Assistant</h1>") | |
gr.Markdown("Upload a PDF or TXT file, and ask questions about its content. The assistant uses Mistral 7B (quantized) for reasoning.") | |
with gr.Row(): | |
file_input = gr.File(label="Upload PDF or TXT", file_types=[".pdf", ".txt"]) | |
upload_output = gr.Textbox(label="Upload Status") | |
with gr.Row(): | |
question_input = gr.Textbox(label="Ask a Question", placeholder="e.g. What is the summary?") | |
answer_output = gr.Textbox(label="Answer") | |
file_input.change(fn=process_file, inputs=file_input, outputs=upload_output) | |
question_input.submit(fn=generate_answer, inputs=question_input, outputs=answer_output) | |
demo.launch(show_error=True, share=False) | |