|
import gradio as gr |
|
import PyPDF2 |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import re |
|
import io |
|
|
|
class SimpleRAG: |
|
def __init__(self): |
|
self.model = SentenceTransformer('all-MiniLM-L6-v2') |
|
self.chunks = [] |
|
self.embeddings = None |
|
self.document_processed = False |
|
|
|
def extract_text_from_pdf(self, pdf_file): |
|
try: |
|
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file)) |
|
text = "" |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() + "\n" |
|
return text |
|
except Exception as e: |
|
raise Exception(f"Error reading PDF: {str(e)}") |
|
|
|
def chunk_text(self, text, chunk_size=500, overlap=100): |
|
sentences = re.split(r'[.!?]+', text) |
|
chunks = [] |
|
current_chunk = "" |
|
|
|
for sentence in sentences: |
|
sentence = sentence.strip() |
|
if not sentence: |
|
continue |
|
|
|
if len(current_chunk) + len(sentence) <= chunk_size: |
|
current_chunk += sentence + ". " |
|
else: |
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
current_chunk = sentence + ". " |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
|
|
return [chunk for chunk in chunks if len(chunk) > 50] |
|
|
|
def process_document(self, pdf_file): |
|
try: |
|
text = self.extract_text_from_pdf(pdf_file) |
|
if not text.strip(): |
|
return "Error: Could not extract text from PDF. Please try a different file." |
|
|
|
self.chunks = self.chunk_text(text) |
|
if not self.chunks: |
|
return "Error: No meaningful content found in the PDF." |
|
|
|
self.embeddings = self.model.encode(self.chunks) |
|
self.document_processed = True |
|
|
|
return f"Document processed successfully! Created {len(self.chunks)} text chunks. You can now ask questions." |
|
|
|
except Exception as e: |
|
return f"Error processing document: {str(e)}" |
|
|
|
def search_similar_chunks(self, query, top_k=3): |
|
if not self.document_processed: |
|
return "Please upload and process a document first." |
|
|
|
try: |
|
query_embedding = self.model.encode([query]) |
|
similarities = cosine_similarity(query_embedding, self.embeddings)[0] |
|
top_indices = np.argsort(similarities)[::-1][:top_k] |
|
|
|
results = [] |
|
for idx in top_indices: |
|
results.append({ |
|
'chunk': self.chunks[idx], |
|
'similarity': similarities[idx] |
|
}) |
|
|
|
return results |
|
|
|
except Exception as e: |
|
return f"Error searching: {str(e)}" |
|
|
|
def answer_question(self, question): |
|
if not self.document_processed: |
|
return "Please upload and process a document first." |
|
|
|
if not question.strip(): |
|
return "Please enter a question." |
|
|
|
results = self.search_similar_chunks(question) |
|
|
|
if isinstance(results, str): |
|
return results |
|
|
|
if not results: |
|
return "No relevant information found in the document." |
|
|
|
answer = "Based on the document, here's what I found:\n\n" |
|
|
|
for i, result in enumerate(results): |
|
answer += f"**Relevant section {i+1}** (similarity: {result['similarity']:.3f}):\n" |
|
answer += f"{result['chunk']}\n\n" |
|
|
|
return answer |
|
|
|
rag_system = SimpleRAG() |
|
|
|
def process_pdf(pdf_file): |
|
if pdf_file is None: |
|
return "Please upload a PDF file." |
|
|
|
try: |
|
with open(pdf_file.name, 'rb') as f: |
|
pdf_content = f.read() |
|
|
|
result = rag_system.process_document(pdf_content) |
|
return result |
|
|
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
def ask_question(question): |
|
return rag_system.answer_question(question) |
|
|
|
with gr.Blocks(title="RAG MVP - Document Q&A", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown(""" |
|
# RAG MVP - Document Q&A System |
|
|
|
Upload a PDF document and ask questions about its content. The system will find relevant information and provide answers based on the document. |
|
|
|
**How to use:** |
|
1. Upload a PDF file |
|
2. Wait for processing to complete |
|
3. Ask questions about the document |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
pdf_input = gr.File( |
|
label="Upload PDF Document", |
|
file_types=[".pdf"], |
|
type="filepath" |
|
) |
|
|
|
process_btn = gr.Button("Process Document", variant="primary") |
|
|
|
process_output = gr.Textbox( |
|
label="Processing Status", |
|
lines=3, |
|
interactive=False |
|
) |
|
|
|
with gr.Column(scale=1): |
|
question_input = gr.Textbox( |
|
label="Ask a Question", |
|
placeholder="What is this document about?", |
|
lines=2 |
|
) |
|
|
|
ask_btn = gr.Button("Get Answer", variant="secondary") |
|
|
|
answer_output = gr.Textbox( |
|
label="Answer", |
|
lines=10, |
|
interactive=False |
|
) |
|
|
|
gr.Markdown(""" |
|
### Sample Questions to Try: |
|
- What is the main topic of this document? |
|
- Can you summarize the key points? |
|
- What are the important details mentioned? |
|
""") |
|
|
|
process_btn.click( |
|
fn=process_pdf, |
|
inputs=[pdf_input], |
|
outputs=[process_output] |
|
) |
|
|
|
ask_btn.click( |
|
fn=ask_question, |
|
inputs=[question_input], |
|
outputs=[answer_output] |
|
) |
|
|
|
question_input.submit( |
|
fn=ask_question, |
|
inputs=[question_input], |
|
outputs=[answer_output] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |