import faiss import numpy as np from pathlib import Path from typing import Dict, List, Optional from src.shared_utils.document_processing.pdf_parser import extract_text_with_metadata from src.shared_utils.document_processing.hybrid_parser import parse_pdf_with_hybrid_approach from src.shared_utils.embeddings.generator import generate_embeddings from src.shared_utils.retrieval.hybrid_search import HybridRetriever from src.shared_utils.retrieval.vocabulary_index import VocabularyIndex class BasicRAG: """Basic RAG system combining PDF processing, chunking, and embedding search.""" def __init__(self): """ Initialize FAISS index and document storage. Recommended Usage: - For production: Use hybrid_query() method (best performance + quality) - For research: enhanced_hybrid_query() available but not recommended """ self.index = None self.chunks = [] # Store chunk text and metadata self.embedding_dim = 384 # multi-qa-MiniLM-L6-cos-v1 dimension self.hybrid_retriever: Optional[HybridRetriever] = None self.vocabulary_index: Optional[VocabularyIndex] = None def index_document(self, pdf_path: Path) -> int: """ Process PDF into chunks, generate embeddings, and add to FAISS index. Args: pdf_path: Path to PDF file Returns: Number of chunks indexed """ try: # Extract text from PDF with metadata print(f"Extracting text from {pdf_path}...") text_data = extract_text_with_metadata(pdf_path) print(f"Extracted {len(text_data.get('text', ''))} characters") # Chunk the text using hybrid TOC + PDFPlumber approach print("Chunking text...") chunks = parse_pdf_with_hybrid_approach( pdf_path, text_data, target_chunk_size=1400, min_chunk_size=800, max_chunk_size=2000 ) print(f"Created {len(chunks)} chunks") except Exception as e: print(f"Error in document processing: {e}") import traceback print(f"Traceback: {traceback.format_exc()}") raise # Generate embeddings chunk_texts = [chunk["text"] for chunk in chunks] embeddings = generate_embeddings(chunk_texts) # Initialize FAISS index if first document if self.index is None: self.index = faiss.IndexFlatIP(self.embedding_dim) # Inner product for similarity # Add embeddings to FAISS index # Normalize embeddings for cosine similarity normalized_embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) self.index.add(normalized_embeddings.astype(np.float32)) # Store chunks with enhanced metadata from structure-preserving parser for i, chunk in enumerate(chunks): chunk_info = { "text": chunk["text"], "source": str(pdf_path), "page": chunk.get("page", 0), "chunk_id": len(self.chunks) + i, "start_char": chunk.get("start_char", 0), "end_char": chunk.get("end_char", len(chunk["text"])), # Structure-preserving metadata "title": chunk.get("title", ""), "parent_title": chunk.get("parent_title", ""), "context": chunk.get("context", ""), "level": chunk.get("level", 0), "quality_score": chunk.get("metadata", {}).get("quality_score", 0.0), "parsing_method": "structure_preserving" } self.chunks.append(chunk_info) # Initialize hybrid retriever and index chunks if self.hybrid_retriever is None: self.hybrid_retriever = HybridRetriever() # Re-index all chunks for hybrid search self.hybrid_retriever.index_documents(self.chunks) # Build or update vocabulary index if self.vocabulary_index is None: self.vocabulary_index = VocabularyIndex() # Build vocabulary from all chunks print("Building vocabulary index...") self.vocabulary_index.build_from_chunks(self.chunks) # Print vocabulary statistics stats = self.vocabulary_index.get_vocabulary_stats() print(f"Vocabulary stats: {stats['unique_terms']} unique terms, " f"{stats['technical_terms']} technical terms") return len(chunks) def index_documents(self, pdf_folder: Path) -> Dict[str, int]: """ Process multiple PDF documents from a folder into the unified index. Args: pdf_folder: Path to folder containing PDF files Returns: Dict mapping document names to number of chunks indexed Raises: ValueError: If folder doesn't exist or no PDFs found """ if not pdf_folder.exists() or not pdf_folder.is_dir(): raise ValueError(f"PDF folder not found: {pdf_folder}") pdf_files = list(pdf_folder.glob("*.pdf")) if not pdf_files: raise ValueError(f"No PDF files found in {pdf_folder}") results = {} total_chunks = 0 print(f"Processing {len(pdf_files)} PDF documents...") for pdf_file in pdf_files: print(f"\nProcessing: {pdf_file.name}") try: chunk_count = self.index_document(pdf_file) results[pdf_file.name] = chunk_count total_chunks += chunk_count print(f" āœ… Indexed {chunk_count} chunks") except Exception as e: print(f" āŒ Failed to process {pdf_file.name}: {e}") results[pdf_file.name] = 0 print(f"\nšŸ“Š Multi-document indexing complete:") print(f" - {len([r for r in results.values() if r > 0])}/{len(pdf_files)} documents processed successfully") print(f" - {total_chunks} total chunks indexed") print(f" - {len(set(chunk['source'] for chunk in self.chunks))} unique sources") return results def query(self, question: str, top_k: int = 5, similarity_threshold: float = 0.3) -> Dict: """ Search for relevant chunks and return results. Args: question: User question top_k: Number of top results to return similarity_threshold: Minimum similarity score (0.3 = 30% similarity) Returns: Dict with question, relevant chunks, and sources """ if self.index is None or len(self.chunks) == 0: return {"question": question, "chunks": [], "sources": []} # Generate embedding for question question_embedding = generate_embeddings([question]) normalized_question = question_embedding / np.linalg.norm(question_embedding, axis=1, keepdims=True) # Search FAISS index scores, indices = self.index.search(normalized_question.astype(np.float32), top_k) # Retrieve relevant chunks with similarity filtering relevant_chunks = [] sources = set() for score, idx in zip(scores[0], indices[0]): if idx < len(self.chunks) and float(score) >= similarity_threshold: # Filter by similarity chunk = self.chunks[idx].copy() chunk["similarity_score"] = float(score) relevant_chunks.append(chunk) sources.add(chunk["source"]) return { "question": question, "chunks": relevant_chunks, "sources": list(sources), "similarity_threshold": similarity_threshold, "total_candidates": len([s for s in scores[0] if s >= similarity_threshold]) } def hybrid_query(self, question: str, top_k: int = 5, dense_weight: float = 0.7, similarity_threshold: float = 0.3) -> Dict: """ Enhanced query using hybrid dense + sparse retrieval. Combines semantic similarity (embeddings) with keyword matching (BM25) using Reciprocal Rank Fusion for optimal relevance ranking. Args: question: User query top_k: Number of results to return dense_weight: Weight for dense retrieval (0.7 = 70% semantic, 30% keyword) similarity_threshold: Minimum similarity score to include results (0.3 = 30%) Returns: Enhanced results with hybrid_score field and retrieval method indicators Raises: ValueError: If hybrid retriever not initialized """ if self.hybrid_retriever is None or len(self.chunks) == 0: return {"question": question, "chunks": [], "sources": [], "retrieval_method": "none"} # Perform hybrid search try: # Update hybrid retriever weight if different if abs(self.hybrid_retriever.dense_weight - dense_weight) > 0.01: self.hybrid_retriever.dense_weight = dense_weight hybrid_results = self.hybrid_retriever.search(question, top_k, similarity_threshold=similarity_threshold) # Process results for consistency with basic query format relevant_chunks = [] sources = set() for chunk_idx, rrf_score, chunk_dict in hybrid_results: # Add hybrid-specific metadata enhanced_chunk = chunk_dict.copy() enhanced_chunk["hybrid_score"] = float(rrf_score) enhanced_chunk["retrieval_method"] = "hybrid" relevant_chunks.append(enhanced_chunk) sources.add(enhanced_chunk["source"]) # Get retrieval statistics for transparency stats = self.hybrid_retriever.get_retrieval_stats() return { "question": question, "chunks": relevant_chunks, "sources": list(sources), "retrieval_method": "hybrid", "dense_weight": dense_weight, "sparse_weight": 1.0 - dense_weight, "stats": stats } except Exception as e: # Fallback to basic semantic search on hybrid failure print(f"Hybrid search failed: {e}") print("Falling back to basic semantic search...") basic_result = self.query(question, top_k) basic_result["retrieval_method"] = "fallback_semantic" basic_result["error"] = str(e) return basic_result def enhanced_hybrid_query(self, question: str, top_k: int = 5, enable_enhancement: bool = False) -> Dict: """ Hybrid query with optional enhancement (DISABLED BY DEFAULT). Based on comprehensive evaluation, query enhancement does not provide meaningful improvements and adds computational overhead. Enhancement is disabled by default and standard hybrid search is recommended. Evaluation Results: - Enhancement shows no statistical significance (p=0.374) - 1.7x slower than standard hybrid search - Lower quality scores than baseline methods Args: question: User query string top_k: Number of results to return enable_enhancement: Enable query enhancement (NOT RECOMMENDED) Returns: Hybrid search results with optional enhancement metadata Recommendation: Use hybrid_query() directly for better performance """ if not question or not question.strip(): return { "question": question, "chunks": [], "sources": [], "retrieval_method": "none", "enhancement_applied": False } # Check if enhancement is enabled (DISABLED BY DEFAULT) if not enable_enhancement: # Use standard hybrid search (RECOMMENDED) hybrid_result = self.hybrid_query(question, top_k) hybrid_result.update({ "original_query": question, "enhancement_applied": False, "enhancement_disabled": True, "retrieval_method": "hybrid_recommended", "note": "Enhancement disabled based on evaluation - use hybrid_query() directly" }) return hybrid_result try: # Enhancement enabled (NOT RECOMMENDED - adds overhead without benefit) from src.shared_utils.query_processing.query_enhancer import QueryEnhancer # Initialize enhancer enhancer = QueryEnhancer() # Step 1: Get baseline semantic results for quality comparison baseline_result = self.query(question, top_k) baseline_score = 0.0 if baseline_result.get('chunks'): baseline_score = baseline_result['chunks'][0].get('similarity_score', 0.0) # Step 2: Perform vocabulary-aware enhancement if available if self.vocabulary_index is not None: enhancement_result = enhancer.enhance_query_with_vocabulary( question, vocabulary_index=self.vocabulary_index, min_frequency=3 ) else: # Fallback to conservative enhancement enhancement_result = enhancer.enhance_query(question, conservative=True) enhanced_query = enhancement_result['enhanced_query'] optimal_weight = enhancement_result['optimal_weight'] analysis = enhancement_result['analysis'] metadata = enhancement_result['enhancement_metadata'] # Step 3: Quality check - only enhance if expansion is minimal expansion_ratio = metadata.get('expansion_ratio', 1.0) should_enhance = ( expansion_ratio <= 2.0 and # Limit expansion bloat analysis.get('technical_term_count', 0) > 0 # Has technical content ) if should_enhance: # Execute hybrid search with enhanced query hybrid_result = self.hybrid_query(enhanced_query, top_k, optimal_weight) # Enhance result with query enhancement metadata hybrid_result.update({ "original_query": question, "enhanced_query": enhanced_query, "adaptive_weight": optimal_weight, "query_analysis": analysis, "enhancement_metadata": metadata, "enhancement_applied": True, "retrieval_method": "enhanced_hybrid", "baseline_score": baseline_score, "quality_validated": True, "warning": "Enhancement enabled despite evaluation showing no benefit" }) return hybrid_result else: # Enhancement not beneficial - use standard hybrid hybrid_result = self.hybrid_query(question, top_k) hybrid_result.update({ "original_query": question, "enhancement_applied": False, "fallback_reason": f"Enhancement not beneficial (expansion: {expansion_ratio:.1f}x)", "baseline_score": baseline_score, "quality_validated": True }) return hybrid_result except ImportError: # QueryEnhancer not available - fallback to basic hybrid print("QueryEnhancer not available, falling back to standard hybrid search") result = self.hybrid_query(question, top_k) result["enhancement_applied"] = False result["fallback_reason"] = "QueryEnhancer import failed" return result except Exception as e: # Enhancement failed - fallback to basic hybrid print(f"Query enhancement failed: {e}") print("Falling back to standard hybrid search...") try: result = self.hybrid_query(question, top_k) result.update({ "original_query": question, "enhancement_applied": False, "enhancement_error": str(e), "fallback_reason": "Enhancement processing failed" }) return result except Exception as hybrid_error: # Both enhancement and hybrid failed - fallback to semantic print(f"Hybrid search also failed: {hybrid_error}") print("Falling back to basic semantic search...") semantic_result = self.query(question, top_k) semantic_result.update({ "original_query": question, "retrieval_method": "fallback_semantic", "enhancement_applied": False, "enhancement_error": str(e), "hybrid_error": str(hybrid_error), "fallback_reason": "Both enhancement and hybrid failed" }) return semantic_result