import gradio as gr import torch from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig from sentence_transformers import SentenceTransformer import faiss import numpy as np import PyPDF2 import docx import io import os import re from typing import List, Optional, Dict, Tuple import json from collections import Counter class SmartDocumentRAG: def __init__(self): print("🚀 Initializing Enhanced Smart RAG System...") # Initialize better embedding model self.embedder = SentenceTransformer('all-mpnet-base-v2') # Better than MiniLM print("✅ Enhanced embedding model loaded") # Initialize quantized LLM self.setup_llm() # Document storage self.documents = [] self.document_metadata = [] self.index = None self.is_indexed = False self.raw_text = "" self.document_type = "general" self.document_summary = "" self.sentence_embeddings = [] # Store sentence-level embeddings self.sentences = [] # Store individual sentences def setup_llm(self): """Setup optimized model for better text generation""" try: if not torch.cuda.is_available(): print("⚠️ CUDA not available, using CPU-optimized model") self.setup_cpu_model() return # Use a better model for instruction following model_name = "microsoft/DialoGPT-medium" # Better for Q&A try: self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16, device_map="auto" ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token print("✅ Enhanced Q&A model loaded successfully") except Exception as e: print(f"Falling back to Mistral: {e}") self.setup_mistral_model() except Exception as e: print(f"❌ Error loading models: {e}") self.setup_cpu_model() def setup_mistral_model(self): """Setup Mistral with better configuration""" try: quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4" ) model_name = "mistralai/Mistral-7B-Instruct-v0.1" self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained( model_name, quantization_config=quantization_config, device_map="auto", torch_dtype=torch.float16 ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token print("✅ Mistral model loaded") except Exception as e: print(f"❌ Mistral failed: {e}") self.setup_cpu_model() def setup_cpu_model(self): """Setup CPU-friendly model""" try: model_name = "distilgpt2" # Lighter than GPT-2 medium self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained(model_name) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token print("✅ CPU model loaded") except Exception as e: print(f"❌ All models failed: {e}") self.model = None self.tokenizer = None def detect_document_type(self, text: str) -> str: """Enhanced document type detection""" text_lower = text.lower() # More comprehensive keyword matching resume_patterns = [ 'experience', 'skills', 'education', 'linkedin', 'email', 'phone', 'work experience', 'employment', 'resume', 'cv', 'curriculum vitae', 'internship', 'projects', 'achievements', 'career', 'profile' ] research_patterns = [ 'abstract', 'introduction', 'methodology', 'conclusion', 'references', 'literature review', 'hypothesis', 'study', 'research', 'findings', 'data analysis', 'results', 'discussion', 'bibliography' ] business_patterns = [ 'company', 'revenue', 'market', 'strategy', 'business', 'financial', 'quarter', 'profit', 'sales', 'growth', 'investment', 'stakeholder', 'operations', 'management', 'corporate', 'enterprise' ] technical_patterns = [ 'implementation', 'algorithm', 'system', 'technical', 'specification', 'architecture', 'development', 'software', 'programming', 'api', 'database', 'framework', 'deployment', 'infrastructure' ] # Count matches with higher weights for exact phrases def count_matches(patterns, text): score = 0 for pattern in patterns: if pattern in text: score += text.count(pattern) return score scores = { 'resume': count_matches(resume_patterns, text_lower), 'research': count_matches(research_patterns, text_lower), 'business': count_matches(business_patterns, text_lower), 'technical': count_matches(technical_patterns, text_lower) } max_score = max(scores.values()) if max_score > 3: return max(scores, key=scores.get) return 'general' def create_document_summary(self, text: str) -> str: """Enhanced document summary creation""" try: # Clean and prepare text clean_text = re.sub(r'\s+', ' ', text).strip() sentences = re.split(r'[.!?]+', clean_text) sentences = [s.strip() for s in sentences if len(s.strip()) > 20] if not sentences: return "Document contains basic information." # Extract key information based on document type if self.document_type == 'resume': return self.extract_resume_summary(sentences) elif self.document_type == 'research': return self.extract_research_summary(sentences) elif self.document_type == 'business': return self.extract_business_summary(sentences) else: return self.extract_general_summary(sentences) except Exception as e: print(f"Summary creation error: {e}") return "Document summary not available." def extract_resume_summary(self, sentences: List[str]) -> str: """Extract resume-specific summary""" key_info = [] # Look for name, role, experience for sentence in sentences[:10]: # Check first 10 sentences lower = sentence.lower() if any(word in lower for word in ['engineer', 'developer', 'manager', 'analyst', 'specialist']): key_info.append(sentence) if any(word in lower for word in ['years', 'experience', 'worked']): key_info.append(sentence) if len(key_info) >= 2: break if key_info: return '. '.join(key_info[:2]) + '.' return "Resume of a professional with relevant experience and skills." def extract_research_summary(self, sentences: List[str]) -> str: """Extract research paper summary""" abstract_sentences = [] intro_sentences = [] for sentence in sentences: lower = sentence.lower() if any(word in lower for word in ['study', 'research', 'analysis', 'findings']): if len(sentence) > 50: # Substantial sentences abstract_sentences.append(sentence) elif any(word in lower for word in ['propose', 'method', 'approach']): intro_sentences.append(sentence) summary_sentences = (abstract_sentences + intro_sentences)[:2] if summary_sentences: return '. '.join(summary_sentences) + '.' return "Research document with methodology and findings." def extract_business_summary(self, sentences: List[str]) -> str: """Extract business document summary""" business_sentences = [] for sentence in sentences: lower = sentence.lower() if any(word in lower for word in ['company', 'business', 'market', 'strategy', 'revenue']): if len(sentence) > 40: business_sentences.append(sentence) if business_sentences: return '. '.join(business_sentences[:2]) + '.' return "Business document containing strategic and operational information." def extract_general_summary(self, sentences: List[str]) -> str: """Extract general document summary""" # Take the most informative sentences (longer ones with key terms) scored_sentences = [] for sentence in sentences: score = len(sentence.split()) # Word count as base score if any(word in sentence.lower() for word in ['important', 'key', 'main', 'primary']): score += 10 scored_sentences.append((sentence, score)) # Sort by score and take top sentences scored_sentences.sort(key=lambda x: x[1], reverse=True) top_sentences = [s[0] for s in scored_sentences[:2]] if top_sentences: return '. '.join(top_sentences) + '.' return "Document contains relevant information and details." def extract_text_from_file(self, file_path: str) -> str: """Enhanced text extraction with better error handling""" try: file_extension = os.path.splitext(file_path)[1].lower() if file_extension == '.pdf': return self.extract_from_pdf(file_path) elif file_extension == '.docx': return self.extract_from_docx(file_path) elif file_extension == '.txt': return self.extract_from_txt(file_path) else: return f"Unsupported file format: {file_extension}" except Exception as e: return f"Error reading file: {str(e)}" def extract_from_pdf(self, file_path: str) -> str: """Enhanced PDF extraction with better text cleaning""" text = "" try: with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page_num, page in enumerate(pdf_reader.pages): page_text = page.extract_text() if page_text.strip(): # Clean the text page_text = re.sub(r'\s+', ' ', page_text) text += f"{page_text}\n" except Exception as e: text = f"Error reading PDF: {str(e)}" return text.strip() def extract_from_docx(self, file_path: str) -> str: """Enhanced DOCX extraction""" try: doc = docx.Document(file_path) text = "" for paragraph in doc.paragraphs: if paragraph.text.strip(): text += paragraph.text.strip() + "\n" return text.strip() except Exception as e: return f"Error reading DOCX: {str(e)}" def extract_from_txt(self, file_path: str) -> str: """Enhanced TXT extraction""" encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as file: content = file.read() # Clean the content content = re.sub(r'\s+', ' ', content) return content.strip() except UnicodeDecodeError: continue except Exception as e: return f"Error reading TXT: {str(e)}" return "Error: Could not decode file with any supported encoding" def enhanced_chunk_text(self, text: str) -> List[Dict]: """Enhanced chunking strategy for better retrieval""" if not text.strip(): return [] chunks = [] # Split into sentences first sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if len(s.strip()) > 15] # Store sentences for fine-grained retrieval self.sentences = sentences # Create overlapping chunks chunk_size = 3 # sentences per chunk overlap = 1 # sentence overlap for i in range(0, len(sentences), chunk_size - overlap): chunk_sentences = sentences[i:i + chunk_size] if chunk_sentences: chunk_text = '. '.join(chunk_sentences) if len(chunk_text.strip()) > 20: chunks.append({ 'text': chunk_text + '.', 'sentence_indices': list(range(i, min(i + chunk_size, len(sentences)))), 'doc_type': self.document_type }) return chunks def process_documents(self, files) -> str: """Enhanced document processing""" if not files: return "❌ No files uploaded!" try: all_text = "" processed_files = [] for file in files: if file is None: continue file_text = self.extract_text_from_file(file.name) if not file_text.startswith("Error") and not file_text.startswith("Unsupported"): all_text += f"\n{file_text}" processed_files.append(os.path.basename(file.name)) else: return f"❌ {file_text}" if not all_text.strip(): return "❌ No text extracted from files!" # Store and analyze self.raw_text = all_text self.document_type = self.detect_document_type(all_text) self.document_summary = self.create_document_summary(all_text) # Enhanced chunking chunk_data = self.enhanced_chunk_text(all_text) if not chunk_data: return "❌ No valid text chunks created!" self.documents = [chunk['text'] for chunk in chunk_data] self.document_metadata = chunk_data # Create embeddings for chunks print(f"📄 Creating embeddings for {len(self.documents)} chunks...") embeddings = self.embedder.encode(self.documents, show_progress_bar=False) # Also create sentence-level embeddings for fine-grained search if self.sentences: print(f"📝 Creating sentence embeddings for {len(self.sentences)} sentences...") self.sentence_embeddings = self.embedder.encode(self.sentences, show_progress_bar=False) # Build FAISS index dimension = embeddings.shape[1] self.index = faiss.IndexFlatIP(dimension) # Normalize for cosine similarity faiss.normalize_L2(embeddings) self.index.add(embeddings.astype('float32')) self.is_indexed = True return f"✅ Successfully processed {len(processed_files)} files:\n" + \ f"📄 Files: {', '.join(processed_files)}\n" + \ f"📊 Document Type: {self.document_type.title()}\n" + \ f"🔍 Created {len(self.documents)} chunks and {len(self.sentences)} sentences\n" + \ f"📝 Summary: {self.document_summary}\n" + \ f"🚀 Ready for enhanced Q&A!" except Exception as e: return f"❌ Error processing documents: {str(e)}" def find_relevant_content(self, query: str, k: int = 5) -> Tuple[str, List[str]]: """Enhanced content retrieval using multiple strategies""" if not self.is_indexed: return "", [] try: query_lower = query.lower() relevant_content = [] # Strategy 1: Semantic search using embeddings query_embedding = self.embedder.encode([query]) faiss.normalize_L2(query_embedding) scores, indices = self.index.search(query_embedding.astype('float32'), min(k, len(self.documents))) semantic_matches = [] for i, idx in enumerate(indices[0]): if idx < len(self.documents) and scores[0][i] > 0.2: # Relevance threshold semantic_matches.append(self.documents[idx]) # Strategy 2: Keyword matching in sentences query_words = set(query_lower.split()) keyword_matches = [] for sentence in self.sentences: sentence_words = set(sentence.lower().split()) overlap = len(query_words.intersection(sentence_words)) if overlap >= 2: # At least 2 word overlap keyword_matches.append(sentence) # Strategy 3: Pattern matching for specific question types pattern_matches = [] if any(word in query_lower for word in ['name', 'who']): # Look for names and identities for sentence in self.sentences: if re.search(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', sentence): # Name pattern pattern_matches.append(sentence) if any(word in query_lower for word in ['experience', 'work', 'job']): # Look for experience-related content for sentence in self.sentences: if any(word in sentence.lower() for word in ['year', 'experience', 'work', 'company', 'role']): pattern_matches.append(sentence) if any(word in query_lower for word in ['skill', 'technology', 'tech']): # Look for skills and technologies for sentence in self.sentences: if any(word in sentence.lower() for word in ['skill', 'technology', 'programming', 'software']): pattern_matches.append(sentence) # Combine all strategies all_matches = list(set(semantic_matches + keyword_matches + pattern_matches)) # Sort by relevance (prefer shorter, more specific sentences) all_matches.sort(key=lambda x: len(x.split())) return '\n'.join(all_matches[:k]), all_matches[:k] except Exception as e: print(f"Error in content retrieval: {e}") return "", [] def generate_direct_answer(self, query: str, context: str) -> str: """Generate direct, relevant answers""" if not context: return "No relevant information found in the document." query_lower = query.lower() context_sentences = [s.strip() for s in context.split('\n') if s.strip()] # Handle specific question types with direct extraction if any(word in query_lower for word in ['name', 'who is']): # Extract names for sentence in context_sentences: names = re.findall(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', sentence) if names: return f"The person mentioned is {names[0]}." if any(word in query_lower for word in ['experience', 'years']): # Extract experience information for sentence in context_sentences: exp_match = re.search(r'(\d+)\s*(?:years?|yr)', sentence.lower()) if exp_match: return f"The experience mentioned is {exp_match.group(1)} years. {sentence}" if any(word in query_lower for word in ['skill', 'technology']): # Extract skills skills = [] for sentence in context_sentences: # Look for programming languages, frameworks, etc. tech_words = ['python', 'java', 'javascript', 'react', 'node', 'sql', 'aws', 'docker'] found_tech = [word for word in tech_words if word in sentence.lower()] if found_tech: skills.extend(found_tech) if skills: return f"Technologies/skills mentioned include: {', '.join(set(skills))}. {context_sentences[0] if context_sentences else ''}" if any(word in query_lower for word in ['education', 'degree', 'university', 'college']): # Extract education information for sentence in context_sentences: if any(word in sentence.lower() for word in ['degree', 'university', 'college', 'bachelor', 'master']): return sentence if any(word in query_lower for word in ['summary', 'about', 'overview']): return self.document_summary # For other questions, return the most relevant sentence if context_sentences: # Score sentences by query word overlap query_words = set(query_lower.split()) scored_sentences = [] for sentence in context_sentences: sentence_words = set(sentence.lower().split()) overlap = len(query_words.intersection(sentence_words)) scored_sentences.append((sentence, overlap)) # Sort by overlap and return best match scored_sentences.sort(key=lambda x: x[1], reverse=True) if scored_sentences and scored_sentences[0][1] > 0: return scored_sentences[0][0] else: return context_sentences[0] # Return first relevant sentence return "I found relevant content but couldn't extract a specific answer." def answer_question(self, query: str) -> str: """Main question answering function with enhanced accuracy""" if not query.strip(): return "❓ Please ask a question!" if not self.is_indexed: return "📁 Please upload and process documents first!" try: # Handle summary requests directly query_lower = query.lower() if query_lower in ['summary', 'summarize', 'about', 'overview']: return f"📄 **Document Summary:**\n\n{self.document_summary}" # Find relevant content using enhanced retrieval context, matches = self.find_relevant_content(query, k=5) if not context: return "🔍 No relevant information found. Try rephrasing your question or asking about different aspects of the document." # Generate direct answer answer = self.generate_direct_answer(query, context) # Add context if answer is too brief if len(answer) < 50 and matches: answer += f"\n\n**Additional context:** {matches[0][:200]}..." return answer except Exception as e: return f"❌ Error processing question: {str(e)}" # Initialize the enhanced system print("Initializing Enhanced Smart RAG System...") rag_system = SmartDocumentRAG() # Create the interface def create_interface(): with gr.Blocks(title="🧠 Enhanced Document Q&A", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🧠 Enhanced Document Q&A System **Improved for Better Accuracy & Relevance!** **New Features:** - 🎯 Multi-strategy content retrieval - 📊 Direct answer extraction - 🔍 Enhanced keyword and pattern matching - 📚 Better handling of resumes, research papers, and business docs """) with gr.Tab("📤 Upload & Process"): with gr.Row(): with gr.Column(): file_upload = gr.File( label="📁 Upload Documents", file_count="multiple", file_types=[".pdf", ".docx", ".txt"], height=150 ) process_btn = gr.Button("🔄 Process Documents", variant="primary", size="lg") with gr.Column(): process_status = gr.Textbox( label="📋 Processing Status & Analysis", lines=10, interactive=False ) process_btn.click( fn=rag_system.process_documents, inputs=[file_upload], outputs=[process_status] ) with gr.Tab("❓ Enhanced Q&A"): with gr.Row(): with gr.Column(): question_input = gr.Textbox( label="🤔 Ask Your Question", placeholder="What is the person's name? / How many years of experience? / What are their skills?", lines=3 ) with gr.Row(): ask_btn = gr.Button("🧠 Get Answer", variant="primary") summary_btn = gr.Button("📊 Get Summary", variant="secondary") with gr.Column(): answer_output = gr.Textbox( label="💡 Enhanced Answer", lines=8, interactive=False ) ask_btn.click( fn=rag_system.answer_question, inputs=[question_input], outputs=[answer_output] ) summary_btn.click( fn=lambda: rag_system.answer_question("summary"), inputs=[], outputs=[answer_output] ) gr.Markdown(""" ### 💡 Try These Specific Questions: **For Resumes:** - "What is the person's name?" - "How many years of experience do they have?" - "What are their technical skills?" - "What is their educational background?" - "What companies have they worked for?" **For Any Document:** - "Summarize this document" - "What is the main topic?" - "List the key points" """) with gr.Tab("🔧 System Info"): gr.Markdown(""" ### 🚀 Enhanced Features: **Better Retrieval:** - Semantic search using embeddings - Keyword matching with context - Pattern recognition for names, dates, skills - Multi-level chunking (sentences + paragraphs) **Improved Answers:** - Direct information extraction - Question-type specific processing - Context-aware responses - Relevance scoring and filtering **Document Types:** - ✅ Resumes (name, experience, skills extraction) - ✅ Research papers (methodology, findings) - ✅ Business documents (strategy, metrics) - ✅ Technical documentation (specifications) """) return demo # Launch the app if __name__ == "__main__": demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=True )