|
import gradio as gr |
|
import os |
|
import re |
|
import PyPDF2 |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import json |
|
|
|
class PDFAnalyzer: |
|
def __init__(self): |
|
self.text_chunks = [] |
|
self.embeddings = None |
|
self.active_doc = None |
|
self.model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
def process_pdf(self, filepath): |
|
try: |
|
text = self._extract_text(filepath) |
|
self.text_chunks = self._chunk_text(text) |
|
self.embeddings = self.model.encode(self.text_chunks) |
|
self.active_doc = os.path.basename(filepath) |
|
return json.dumps({ |
|
"status": 200, |
|
"message": f"Document {self.active_doc} processed successfully", |
|
"document_id": hash(self.active_doc) |
|
}) |
|
except Exception as e: |
|
return json.dumps({ |
|
"status": 500, |
|
"error": str(e), |
|
"message": "Document processing failed" |
|
}) |
|
|
|
def _extract_text(self, filepath): |
|
with open(filepath, 'rb') as f: |
|
return ''.join([page.extract_text() for page in PyPDF2.PdfReader(f).pages]) |
|
|
|
def _chunk_text(self, text): |
|
return [text[i:i+500] for i in range(0, len(text), 500)] |
|
|
|
def query(self, question): |
|
if not self.active_doc: |
|
return json.dumps({ |
|
"status": 400, |
|
"message": "No document uploaded", |
|
"results": [] |
|
}) |
|
|
|
ques_emb = self.model.encode(question) |
|
similarities = cosine_similarity([ques_emb], self.embeddings)[0] |
|
best_idx = np.argmax(similarities) |
|
|
|
|
|
confidence = float(similarities[best_idx].item()) |
|
best_idx = int(best_idx.item()) |
|
|
|
full_answer = self.text_chunks[best_idx] |
|
|
|
return json.dumps({ |
|
"status": 200, |
|
"message": "Success", |
|
"results": [{ |
|
"text": self._format_answer(full_answer, question), |
|
"confidence": confidence, |
|
"document_id": str(hash(self.active_doc)), |
|
"metadata": { |
|
"chunk_index": best_idx, |
|
"document": self.active_doc |
|
} |
|
}] |
|
}, default=lambda x: str(x)) |
|
|
|
def _format_answer(self, text, question): |
|
|
|
sentences = re.split(r'(?<=[.!?]) +', text) |
|
question_words = set(question.lower().split()) |
|
|
|
best_sentence = max(sentences, |
|
key=lambda s: len(set(s.lower().split()) & question_words), |
|
default="") |
|
|
|
all_words = ' '.join(sentences).split() |
|
try: |
|
start = max(0, all_words.index(best_sentence.split()[0]) - 50) |
|
end = start + 100 |
|
except: |
|
start = 0 |
|
end = 100 |
|
|
|
return ' '.join(all_words[start:end]) + ("..." if end < len(all_words) else "") |
|
|
|
def create_app(): |
|
analyzer = PDFAnalyzer() |
|
|
|
def format_response(response): |
|
try: |
|
data = json.loads(response) |
|
if data['status'] != 200: |
|
return f"Error: {data.get('message', 'Unknown error')}" |
|
|
|
result = data['results'][0] |
|
return f"**Answer** ({result['confidence']:.2f} confidence):\n{result['text']}" |
|
except: |
|
return "Error processing response" |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as app: |
|
gr.Markdown("# π PDF QA Assistant (Cohere-style API)") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
pdf_upload = gr.File(label="Upload PDF", file_types=[".pdf"]) |
|
status = gr.Markdown("**Status:** Idle") |
|
gr.Button("Process PDF").click( |
|
lambda f: analyzer.process_pdf(f.name) if f else json.dumps({"status": 400, "error": "No file"}), |
|
inputs=pdf_upload, |
|
outputs=status |
|
) |
|
|
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot(height=400) |
|
question = gr.Textbox(label="Query", placeholder="Enter your question...") |
|
question.submit( |
|
lambda q,h: h + [(q, format_response(analyzer.query(q)))], |
|
inputs=[question, chatbot], |
|
outputs=chatbot |
|
) |
|
gr.Button("Clear Session").click( |
|
lambda: [None, None, "**Status:** Session cleared"], |
|
outputs=[chatbot, pdf_upload, status] |
|
) |
|
|
|
return app |
|
|
|
if __name__ == "__main__": |
|
create_app().launch() |