""" Modular Unified Retriever for Architecture Compliance. This module provides a modular implementation of the unified retriever that decomposes functionality into well-defined sub-components following the architecture specification. """ import logging import time from typing import List, Dict, Any, Optional, Tuple import numpy as np from src.core.interfaces import Retriever, Document, RetrievalResult, Embedder, HealthStatus # Forward declaration to avoid circular import from typing import TYPE_CHECKING if TYPE_CHECKING: from src.core.platform_orchestrator import PlatformOrchestrator from .indices.base import VectorIndex from .indices.faiss_index import FAISSIndex from .indices.weaviate_index import WeaviateIndex from .sparse.base import SparseRetriever from .sparse.bm25_retriever import BM25Retriever from .fusion.base import FusionStrategy from .fusion.rrf_fusion import RRFFusion from .fusion.weighted_fusion import WeightedFusion from .fusion.graph_enhanced_fusion import GraphEnhancedRRFFusion from .fusion.score_aware_fusion import ScoreAwareFusion from .rerankers.base import Reranker from .rerankers.semantic_reranker import SemanticReranker from .rerankers.identity_reranker import IdentityReranker from .rerankers.neural_reranker import NeuralReranker logger = logging.getLogger(__name__) class ModularUnifiedRetriever(Retriever): """ Modular unified retriever with pluggable sub-components. This implementation follows the architecture specification by decomposing the retrieval functionality into four distinct sub-components: - Vector Index: Handles dense semantic search - Sparse Retriever: Handles keyword-based search - Fusion Strategy: Combines dense and sparse results - Reranker: Improves result relevance Each sub-component can be independently configured and tested, improving modularity and maintainability. Features: - Architecture-compliant modular design - Configurable sub-components - Component factory integration - Performance monitoring - Backward compatibility with UnifiedRetriever - Enhanced logging and debugging Example: config = { "vector_index": { "type": "faiss", "config": {"index_type": "IndexFlatIP", "normalize_embeddings": True} }, "sparse": { "type": "bm25", "config": {"k1": 1.2, "b": 0.75} }, "fusion": { "type": "rrf", "config": {"k": 60, "weights": {"dense": 0.7, "sparse": 0.3}} }, "reranker": { "type": "semantic", "config": {"enabled": True, "model": "cross-encoder/ms-marco-MiniLM-L-6-v2"} } } retriever = ModularUnifiedRetriever(config, embedder) """ def __init__(self, config: Dict[str, Any], embedder: Embedder): """ Initialize the modular unified retriever. Args: config: Configuration dictionary with sub-component specifications embedder: Embedder component for query encoding """ self.config = config self.embedder = embedder self.documents: List[Document] = [] # Composite filtering configuration (NEW) composite_config = config.get("composite_filtering", {}) self.composite_filtering_enabled = composite_config.get("enabled", False) self.fusion_weight = composite_config.get("fusion_weight", 0.7) self.semantic_weight = composite_config.get("semantic_weight", 0.3) self.min_composite_score = composite_config.get("min_composite_score", 0.4) self.max_candidates_multiplier = composite_config.get("max_candidates", 15) / 10.0 # Convert to multiplier (1.5x) # Legacy semantic gap detection configuration (DEPRECATED) self.min_semantic_alignment = config.get("min_semantic_alignment", 0.3) # Initialize sub-components self.vector_index = self._create_vector_index(config.get("vector_index", {})) self.sparse_retriever = self._create_sparse_retriever(config.get("sparse", {})) self.fusion_strategy = self._create_fusion_strategy(config.get("fusion", {})) self.reranker = self._create_reranker(config.get("reranker", {})) # Performance tracking self.retrieval_stats = { "total_retrievals": 0, "total_time": 0.0, "avg_time": 0.0, "last_retrieval_time": 0.0 } # Backend management (for multi-backend performance testing) self.active_backend_name = "faiss" # Default backend self.available_backends = ["faiss", "weaviate"] self.backend_switch_count = 0 # Platform services (initialized via initialize_services) self.platform: Optional['PlatformOrchestrator'] = None logger.info("ModularUnifiedRetriever initialized with all sub-components") def _create_vector_index(self, config: Dict[str, Any]) -> VectorIndex: """Create vector index sub-component.""" index_type = config.get("type", "faiss") index_config = config.get("config", {}) if index_type == "faiss": return FAISSIndex(index_config) elif index_type == "weaviate": return WeaviateIndex(index_config) else: raise ValueError(f"Unknown vector index type: {index_type}") def _create_sparse_retriever(self, config: Dict[str, Any]) -> SparseRetriever: """Create sparse retriever sub-component.""" retriever_type = config.get("type", "bm25") retriever_config = config.get("config", {}) if retriever_type == "bm25": return BM25Retriever(retriever_config) else: raise ValueError(f"Unknown sparse retriever type: {retriever_type}") def _create_fusion_strategy(self, config: Dict[str, Any]) -> FusionStrategy: """Create fusion strategy sub-component.""" fusion_type = config.get("type", "rrf") fusion_config = config.get("config", {}) if fusion_type == "rrf": return RRFFusion(fusion_config) elif fusion_type == "weighted": return WeightedFusion(fusion_config) elif fusion_type == "graph_enhanced_rrf": return GraphEnhancedRRFFusion(fusion_config) elif fusion_type == "score_aware": return ScoreAwareFusion(fusion_config) else: raise ValueError(f"Unknown fusion strategy type: {fusion_type}. Available options: rrf, weighted, graph_enhanced_rrf, score_aware") def _create_reranker(self, config: Dict[str, Any]) -> Reranker: """Create reranker sub-component.""" reranker_type = config.get("type", "identity") reranker_config = config.get("config", {}) logger.info(f"🔧 Creating reranker: type={reranker_type}, config keys={list(reranker_config.keys())}") if reranker_type == "semantic": reranker = SemanticReranker(reranker_config) elif reranker_type == "identity": reranker = IdentityReranker(reranker_config) elif reranker_type == "neural": try: reranker = NeuralReranker(reranker_config) logger.info(f"✅ NeuralReranker created successfully: enabled={reranker.enabled}, initialized={reranker._initialized}") except Exception as e: logger.error(f"❌ Failed to create NeuralReranker: {e}") logger.warning("Falling back to IdentityReranker") reranker = IdentityReranker({"enabled": True}) else: raise ValueError(f"Unknown reranker type: {reranker_type}") return reranker def retrieve(self, query: str, k: int = 5) -> List[RetrievalResult]: """ Retrieve relevant documents using modular hybrid search. This method orchestrates the complete retrieval pipeline: 1. Generate query embeddings 2. Perform dense vector search 3. Perform sparse keyword search 4. Fuse results using configured strategy 5. Apply reranking if enabled Args: query: Search query string k: Number of results to return Returns: List of retrieval results sorted by relevance score """ start_time = time.time() try: # Validation if k <= 0: raise ValueError("k must be positive") if not query.strip(): raise ValueError("Query cannot be empty") if not self.documents: raise RuntimeError("No documents have been indexed") logger.info(f"🔍 MODULAR RETRIEVER: Starting retrieval for query: '{query}' (k={k})") logger.info(f"📚 CORPUS STATUS: {len(self.documents)} documents indexed") # Step 1: Generate query embeddings query_embedding = np.array(self.embedder.embed([query])[0]) logger.info(f"🔤 QUERY EMBEDDING: Generated {query_embedding.shape} dimensional vector") # Step 2: Dense vector search (with efficiency optimization) candidate_multiplier = int(self.max_candidates_multiplier * k) if self.composite_filtering_enabled else k*2 logger.info(f"🎯 DENSE SEARCH: Searching for top {candidate_multiplier} candidates") dense_results = self.vector_index.search(query_embedding, k=candidate_multiplier) logger.info(f"✅ DENSE RESULTS: {len(dense_results)} documents found") # Log top dense results with scores if dense_results: logger.info(f"📊 TOP DENSE SCORES:") for i, (doc_idx, score) in enumerate(dense_results[:3]): if doc_idx < len(self.documents): doc_title = self.documents[doc_idx].metadata.get('title', f'doc_{doc_idx}')[:50] logger.info(f" {i+1}. [{doc_idx}] {doc_title}... → {score:.4f}") else: logger.warning(f"⚠️ DENSE SEARCH: No results found!") # Step 3: Sparse keyword search (with efficiency optimization) logger.info(f"🔎 SPARSE SEARCH: BM25 keyword search for '{query}' (k={candidate_multiplier})") sparse_results = self.sparse_retriever.search(query, k=candidate_multiplier) logger.info(f"✅ SPARSE RESULTS: {len(sparse_results)} documents found") # Log top sparse results with scores if sparse_results: logger.info(f"📊 TOP SPARSE SCORES:") for i, (doc_idx, score) in enumerate(sparse_results[:3]): if doc_idx < len(self.documents): doc_title = self.documents[doc_idx].metadata.get('title', f'doc_{doc_idx}')[:50] logger.info(f" {i+1}. [{doc_idx}] {doc_title}... → {score:.4f}") else: logger.warning(f"⚠️ SPARSE SEARCH: No results found!") # Step 3.5: Set documents and query for graph enhancement (if supported) if hasattr(self.fusion_strategy, 'set_documents_and_query'): self.fusion_strategy.set_documents_and_query(self.documents, query) # Step 4: Fuse results fusion_name = self.fusion_strategy.__class__.__name__ logger.info(f"🔄 FUSION STRATEGY: Using {fusion_name} to combine results") fused_results = self.fusion_strategy.fuse_results(dense_results, sparse_results) logger.info(f"✅ FUSION RESULTS: {len(fused_results)} documents after fusion") # Log top fused results with scores if fused_results: logger.info(f"📊 TOP FUSED SCORES:") for i, (doc_idx, score) in enumerate(fused_results[:5]): if doc_idx < len(self.documents): doc_title = self.documents[doc_idx].metadata.get('title', f'doc_{doc_idx}')[:50] logger.info(f" {i+1}. [{doc_idx}] {doc_title}... → {score:.4f}") else: logger.warning(f"⚠️ FUSION: No results after fusion!") # Step 4.5: Composite filtering (NEW) or semantic gap detection (LEGACY) if self.composite_filtering_enabled: # NEW: Individual document composite scoring filtered_results = self._calculate_composite_scores(query_embedding, fused_results) if not filtered_results: logger.info("Composite filtering: No documents passed quality threshold") return [] fused_results = filtered_results # Use filtered results for reranking else: # LEGACY: Global semantic gap detection (DEPRECATED) if fused_results and self.min_semantic_alignment > 0: semantic_alignment = self._calculate_semantic_alignment(query_embedding, fused_results[:5]) if semantic_alignment < self.min_semantic_alignment: logger.info(f"Query-document semantic alignment too low: {semantic_alignment:.3f} < {self.min_semantic_alignment}") return [] # No semantically relevant documents found # Step 5: Apply reranking if enabled if self.reranker.is_enabled() and fused_results: reranker_name = self.reranker.__class__.__name__ logger.info(f"🧠 RERANKING: Using {reranker_name} to improve relevance") # Prepare documents and scores for reranking top_candidates = fused_results[:k*2] # Rerank top candidates candidate_documents = [self.documents[idx] for idx, _ in top_candidates] candidate_scores = [score for _, score in top_candidates] logger.info(f"🔄 RERANKING: Processing {len(top_candidates)} candidates") reranked_results = self.reranker.rerank(query, candidate_documents, candidate_scores) # Update final results with reranked scores final_results = [] for local_idx, reranked_score in reranked_results: if local_idx < len(top_candidates): original_idx = top_candidates[local_idx][0] final_results.append((original_idx, reranked_score)) # Add remaining documents that weren't reranked reranked_indices = {top_candidates[local_idx][0] for local_idx, _ in reranked_results if local_idx < len(top_candidates)} for doc_idx, score in fused_results: if doc_idx not in reranked_indices: final_results.append((doc_idx, score)) # Sort by final score and limit to k final_results.sort(key=lambda x: x[1], reverse=True) final_results = final_results[:k] logger.info(f"✅ RERANKING: Final {len(final_results)} results after reranking") else: # No reranking, use fused results directly logger.info(f"⏭️ RERANKING: Skipped (reranker disabled or no results)") final_results = fused_results[:k] # Convert to RetrievalResult objects retrieval_results = [] for doc_idx, score in final_results: if doc_idx < len(self.documents): document = self.documents[doc_idx] retrieval_result = RetrievalResult( document=document, score=float(score), retrieval_method="modular_unified_hybrid" ) retrieval_results.append(retrieval_result) # Log final results summary logger.info(f"🎯 FINAL RETRIEVAL RESULTS: {len(retrieval_results)} documents") if retrieval_results: logger.info(f"📊 FINAL RANKING:") for i, result in enumerate(retrieval_results): doc_title = result.document.metadata.get('title', f'doc_{result.document.content[:30]}')[:50] logger.info(f" {i+1}. {doc_title}... → {result.score:.4f}") else: logger.warning(f"❌ NO RESULTS: Query '{query}' returned no relevant documents!") # Update performance stats elapsed_time = time.time() - start_time self.retrieval_stats["total_retrievals"] += 1 self.retrieval_stats["total_time"] += elapsed_time self.retrieval_stats["avg_time"] = ( self.retrieval_stats["total_time"] / self.retrieval_stats["total_retrievals"] ) self.retrieval_stats["last_retrieval_time"] = elapsed_time # Log performance summary logger.info(f"⚡ RETRIEVAL PERFORMANCE: {elapsed_time*1000:.1f}ms total, {len(retrieval_results)}/{k} results") logger.info(f"🏁 RETRIEVAL COMPLETE: Query '{query}' processed successfully") # Track performance using platform services if self.platform: self.platform.track_component_performance( self, "document_retrieval", { "success": True, "retrieval_time": elapsed_time, "query": query, "results_count": len(retrieval_results), "k_requested": k, "indexed_documents": len(self.documents) } ) return retrieval_results except Exception as e: # Track failure using platform services if self.platform: elapsed_time = time.time() - start_time self.platform.track_component_performance( self, "document_retrieval", { "success": False, "retrieval_time": elapsed_time, "query": query, "k_requested": k, "indexed_documents": len(self.documents), "error": str(e) } ) logger.error(f"Modular retrieval failed: {str(e)}") raise RuntimeError(f"Modular retrieval failed: {str(e)}") from e def index_documents(self, documents: List[Document]) -> None: """ Index documents in all sub-components. Args: documents: List of documents to index """ if not documents: raise ValueError("Cannot index empty document list") # Store documents (extend existing instead of replacing) if not hasattr(self, 'documents') or self.documents is None: self.documents = [] self.documents.extend(documents) # Get embedding dimension from first document if documents[0].embedding is not None: embedding_dim = len(documents[0].embedding) else: raise ValueError("Documents must have embeddings before indexing") # Initialize index only if not already initialized if not hasattr(self.vector_index, 'index') or self.vector_index.index is None: self.vector_index.initialize_index(embedding_dim) # Add documents to vector index self.vector_index.add_documents(documents) # Index in sparse retriever (this needs fixing too) self.sparse_retriever.index_documents(documents) logger.info(f"Indexed {len(documents)} documents in all sub-components") def get_retrieval_stats(self) -> Dict[str, Any]: """ Get comprehensive statistics about the modular retrieval system. Returns: Dictionary with retrieval statistics and sub-component information """ stats = { "component_type": "modular_unified_retriever", "indexed_documents": len(self.documents), "retrieval_stats": self.retrieval_stats.copy(), "sub_components": { "vector_index": self.vector_index.get_index_info(), "sparse_retriever": self.sparse_retriever.get_stats(), "fusion_strategy": self.fusion_strategy.get_strategy_info(), "reranker": self.reranker.get_reranker_info() } } return stats def get_component_info(self) -> Dict[str, Any]: """ Get detailed information about all sub-components. Returns: Dictionary with component details for logging """ return { "vector_index": self.vector_index.get_component_info(), "sparse_retriever": self.sparse_retriever.get_component_info(), "fusion_strategy": self.fusion_strategy.get_component_info(), "reranker": self.reranker.get_component_info() } def supports_batch_queries(self) -> bool: """ Check if this retriever supports batch query processing. Returns: False, as the current implementation processes queries individually """ return False def get_configuration(self) -> Dict[str, Any]: """ Get the current configuration of the modular retriever. Returns: Dictionary with configuration parameters """ return { "vector_index": { "type": "faiss", "config": self.vector_index.get_index_info() }, "sparse": { "type": "bm25", "config": self.sparse_retriever.get_stats() }, "fusion": { "type": type(self.fusion_strategy).__name__.lower().replace("fusion", ""), "config": self.fusion_strategy.get_strategy_info() }, "reranker": { "type": type(self.reranker).__name__.lower().replace("reranker", ""), "config": self.reranker.get_reranker_info() } } def clear_index(self) -> None: """ Clear all indexed documents and reset all sub-components. """ self.documents.clear() self.vector_index.clear() self.sparse_retriever.clear() # Reset performance stats self.retrieval_stats = { "total_retrievals": 0, "total_time": 0.0, "avg_time": 0.0, "last_retrieval_time": 0.0 } logger.info("Cleared all documents from modular retriever") def _consider_backend_switch(self, error: Exception) -> None: """ Consider switching to a different backend due to an error. This method is used for performance testing of backend switching. In a real implementation, this would switch to a fallback backend. Args: error: The exception that triggered the switch consideration """ logger.warning(f"Backend switch consideration triggered by: {error}") # Simulate backend switching logic if self.active_backend_name == "faiss": self.active_backend_name = "weaviate" else: self.active_backend_name = "faiss" self.backend_switch_count += 1 logger.info(f"Switched to backend: {self.active_backend_name} (switch count: {self.backend_switch_count})") # Standard ComponentBase interface implementation def initialize_services(self, platform: 'PlatformOrchestrator') -> None: """Initialize platform services for the component. Args: platform: PlatformOrchestrator instance providing services """ self.platform = platform logger.info("ModularUnifiedRetriever initialized with platform services") def get_health_status(self) -> HealthStatus: """Get the current health status of the component. Returns: HealthStatus object with component health information """ if self.platform: return self.platform.check_component_health(self) # Fallback if platform services not initialized is_healthy = True issues = [] # Check sub-components if not hasattr(self.vector_index, 'get_index_info'): is_healthy = False issues.append("Vector index not properly initialized") if not hasattr(self.sparse_retriever, 'get_stats'): is_healthy = False issues.append("Sparse retriever not properly initialized") if not hasattr(self.fusion_strategy, 'get_strategy_info'): is_healthy = False issues.append("Fusion strategy not properly initialized") if not hasattr(self.reranker, 'get_reranker_info'): is_healthy = False issues.append("Reranker not properly initialized") return HealthStatus( is_healthy=is_healthy, issues=issues, metrics={ "indexed_documents": len(self.documents), "retrieval_stats": self.retrieval_stats, "sub_components": self.get_component_info() }, component_name=self.__class__.__name__ ) def get_metrics(self) -> Dict[str, Any]: """Get component-specific metrics. Returns: Dictionary containing component metrics """ if self.platform: try: component_metrics = self.platform.analytics_service.collect_component_metrics(self) return { "component_name": component_metrics.component_name, "component_type": component_metrics.component_type, "success_count": component_metrics.success_count, "error_count": component_metrics.error_count, "resource_usage": component_metrics.resource_usage, "performance_metrics": component_metrics.performance_metrics, "timestamp": component_metrics.timestamp } except Exception as e: # Fallback if platform service fails pass # Fallback if platform services not initialized return { "indexed_documents": len(self.documents), "retrieval_stats": self.retrieval_stats, "sub_components": self.get_component_info(), "configuration": self.get_configuration() } def get_capabilities(self) -> List[str]: """Get list of component capabilities. Returns: List of capability strings """ capabilities = [ "hybrid_retrieval", "modular_architecture", "dense_search", "sparse_search", "result_fusion", "reranking" ] # Add vector index capabilities if hasattr(self.vector_index, 'get_capabilities'): capabilities.extend([f"vector_{cap}" for cap in self.vector_index.get_capabilities()]) # Add sparse retriever capabilities if hasattr(self.sparse_retriever, 'get_capabilities'): capabilities.extend([f"sparse_{cap}" for cap in self.sparse_retriever.get_capabilities()]) # Add fusion strategy capabilities if hasattr(self.fusion_strategy, 'get_capabilities'): capabilities.extend([f"fusion_{cap}" for cap in self.fusion_strategy.get_capabilities()]) # Add reranker capabilities if hasattr(self.reranker, 'get_capabilities'): capabilities.extend([f"reranker_{cap}" for cap in self.reranker.get_capabilities()]) return capabilities def get_document_count(self) -> int: """Get the number of documents in the retriever.""" return len(self.documents) def get_sub_component_performance(self) -> Dict[str, Any]: """ Get performance information for each sub-component. Returns: Dictionary with performance metrics """ performance = { "vector_index": { "document_count": self.vector_index.get_document_count(), "is_trained": self.vector_index.is_trained() }, "sparse_retriever": { "document_count": self.sparse_retriever.get_document_count(), "stats": self.sparse_retriever.get_stats() }, "fusion_strategy": { "info": self.fusion_strategy.get_strategy_info() }, "reranker": { "enabled": self.reranker.is_enabled(), "info": self.reranker.get_reranker_info() } } return performance def debug_retrieval(self, query: str, k: int = 5) -> Dict[str, Any]: """ Perform retrieval with detailed debugging information. Args: query: Search query k: Number of results to return Returns: Dictionary with step-by-step retrieval information """ debug_info = { "query": query, "k": k, "steps": {} } try: # Step 1: Query embedding query_embedding = self.embedder.embed_query(query) debug_info["steps"]["embedding"] = { "embedding_dim": len(query_embedding), "embedding_norm": float(np.linalg.norm(query_embedding)) } # Step 2: Dense search dense_results = self.vector_index.search(query_embedding, k=k*2) debug_info["steps"]["dense_search"] = { "results_count": len(dense_results), "top_scores": [score for _, score in dense_results[:5]] } # Step 3: Sparse search sparse_results = self.sparse_retriever.search(query, k=k*2) debug_info["steps"]["sparse_search"] = { "results_count": len(sparse_results), "top_scores": [score for _, score in sparse_results[:5]] } # Step 3.5: Set documents and query for graph enhancement (if supported) if hasattr(self.fusion_strategy, 'set_documents_and_query'): self.fusion_strategy.set_documents_and_query(self.documents, query) # Step 4: Fusion fused_results = self.fusion_strategy.fuse_results(dense_results, sparse_results) debug_info["steps"]["fusion"] = { "results_count": len(fused_results), "top_scores": [score for _, score in fused_results[:5]], "fusion_type": type(self.fusion_strategy).__name__ } # Step 5: Reranking if self.reranker.is_enabled(): top_candidates = fused_results[:k*2] candidate_documents = [self.documents[idx] for idx, _ in top_candidates] candidate_scores = [score for _, score in top_candidates] reranked_results = self.reranker.rerank(query, candidate_documents, candidate_scores) debug_info["steps"]["reranking"] = { "enabled": True, "candidates_count": len(candidate_documents), "reranked_count": len(reranked_results), "top_reranked_scores": [score for _, score in reranked_results[:5]] } else: debug_info["steps"]["reranking"] = {"enabled": False} except Exception as e: debug_info["error"] = str(e) return debug_info def _calculate_composite_scores(self, query_embedding: np.ndarray, fused_results: List[Tuple[int, float]]) -> List[Tuple[int, float]]: """ Calculate composite scores for individual documents combining fusion scores and semantic similarity. This method replaces the global semantic gap detection with per-document quality assessment. Each document gets a composite score: α * fusion_score + β * semantic_similarity Only documents above the composite threshold are included. Args: query_embedding: Query embedding vector fused_results: List of (document_index, fusion_score) from fusion strategy Returns: List of (document_index, composite_score) for documents that pass the threshold """ if not fused_results: return [] try: # Normalize fusion scores to 0-1 range for fair combination fusion_scores = [score for _, score in fused_results] if len(set(fusion_scores)) > 1: # Only normalize if there's variation min_score, max_score = min(fusion_scores), max(fusion_scores) score_range = max_score - min_score if score_range > 0: normalized_fusion = [(score - min_score) / score_range for score in fusion_scores] else: normalized_fusion = [1.0] * len(fusion_scores) # All scores identical else: normalized_fusion = [1.0] * len(fusion_scores) # All scores identical # Get document texts and embeddings doc_indices = [doc_idx for doc_idx, _ in fused_results] documents = [self.documents[idx] for idx in doc_indices if idx < len(self.documents)] if not documents: return [] doc_texts = [doc.content for doc in documents] doc_embeddings = self.embedder.embed(doc_texts) # Calculate composite scores for each document composite_results = [] for i, (doc_idx, original_fusion_score) in enumerate(fused_results): if i >= len(doc_embeddings) or doc_idx >= len(self.documents): continue # Calculate semantic similarity doc_emb_array = np.array(doc_embeddings[i]) query_norm = query_embedding / np.linalg.norm(query_embedding) doc_norm = doc_emb_array / np.linalg.norm(doc_emb_array) semantic_similarity = np.dot(query_norm, doc_norm) # Calculate composite score normalized_fusion_score = normalized_fusion[i] composite_score = (self.fusion_weight * normalized_fusion_score + self.semantic_weight * semantic_similarity) # Apply threshold filter if composite_score >= self.min_composite_score: composite_results.append((doc_idx, composite_score)) # Debug logging for first few documents if i < 3: logger.info(f"COMPOSITE DEBUG - Doc {i+1}: fusion={original_fusion_score:.3f}, " f"norm_fusion={normalized_fusion_score:.3f}, semantic={semantic_similarity:.3f}, " f"composite={composite_score:.3f}, threshold={self.min_composite_score}") # Sort by composite score (descending) and return composite_results.sort(key=lambda x: x[1], reverse=True) logger.info(f"COMPOSITE FILTERING - {len(fused_results)} input → {len(composite_results)} passed threshold") return composite_results except Exception as e: logger.warning(f"Error in composite scoring: {e}") # Fallback to original fusion results return fused_results def _calculate_semantic_alignment(self, query_embedding: np.ndarray, fused_results: List[Tuple[int, float]]) -> float: """ Calculate semantic alignment between query and top retrieved documents. Args: query_embedding: Query embedding vector fused_results: List of (document_index, score) from fusion Returns: Average cosine similarity between query and top documents """ if not fused_results: return 0.0 try: # Get embeddings for top documents top_doc_indices = [doc_idx for doc_idx, _ in fused_results] top_documents = [self.documents[idx] for idx in top_doc_indices if idx < len(self.documents)] if not top_documents: return 0.0 # Extract text content for embedding doc_texts = [doc.content for doc in top_documents] # Get document embeddings doc_embeddings = self.embedder.embed(doc_texts) # Calculate cosine similarities similarities = [] for i, doc_embedding in enumerate(doc_embeddings): doc_emb_array = np.array(doc_embedding) # Normalize vectors for cosine similarity query_norm = query_embedding / np.linalg.norm(query_embedding) doc_norm = doc_emb_array / np.linalg.norm(doc_emb_array) similarity = np.dot(query_norm, doc_norm) similarities.append(similarity) # Debug: Log individual document similarities for investigation if i < 3: # Only log first 3 docs to avoid spam doc_preview = doc_texts[i][:100] + "..." if len(doc_texts[i]) > 100 else doc_texts[i] logger.debug(f"Doc {i+1} similarity: {similarity:.3f} - {doc_preview}") # Return average similarity avg_similarity = np.mean(similarities) if similarities else 0.0 logger.debug(f"Semantic alignment: {len(similarities)} docs, similarities={[f'{s:.3f}' for s in similarities[:5]]}, avg={avg_similarity:.3f}") return float(avg_similarity) except Exception as e: logger.warning(f"Error calculating semantic alignment: {e}") return 0.0 # Conservative fallback