enhanced-rag-demo / src /fusion.py
Arthur Passuello
initial commit
5e1a30c
"""
Reciprocal Rank Fusion for combining dense and sparse retrieval results.
Implements the RRF algorithm with configurable weighting.
"""
from typing import List, Tuple, Dict
from collections import defaultdict
def reciprocal_rank_fusion(
dense_results: List[Tuple[int, float]],
sparse_results: List[Tuple[int, float]],
dense_weight: float = 0.7,
k: int = 60,
) -> List[Tuple[int, float]]:
"""
Combine dense and sparse retrieval using Reciprocal Rank Fusion.
RRF Formula: score = Σ weight_i / (k + rank_i)
Where rank_i is the 1-based rank of document in result list i
Args:
dense_results: [(chunk_idx, similarity_score), ...] sorted by relevance
sparse_results: [(chunk_idx, bm25_score), ...] sorted by relevance
dense_weight: Weight for semantic similarity (default 0.7)
k: RRF constant controlling rank influence (default 60)
Returns:
Fused results as [(chunk_idx, rrf_score), ...] sorted by combined score
Raises:
ValueError: If weights are invalid or results are malformed
Performance: O(n + m) where n, m are result list lengths
"""
if not 0 <= dense_weight <= 1:
raise ValueError("dense_weight must be between 0 and 1")
if k <= 0:
raise ValueError("k must be positive")
sparse_weight = 1.0 - dense_weight
# Handle empty results
if not dense_results and not sparse_results:
return []
if not dense_results:
return sparse_results
if not sparse_results:
return dense_results
# Calculate RRF scores for each unique document
rrf_scores: Dict[int, float] = defaultdict(float)
# Add dense retrieval scores (rank-based)
for rank, (chunk_idx, _) in enumerate(dense_results, 1):
rrf_scores[chunk_idx] += dense_weight / (k + rank)
# Add sparse retrieval scores (rank-based)
for rank, (chunk_idx, _) in enumerate(sparse_results, 1):
rrf_scores[chunk_idx] += sparse_weight / (k + rank)
# Convert to sorted list
fused_results = [
(chunk_idx, score) for chunk_idx, score in rrf_scores.items()
]
# Sort by RRF score (descending)
fused_results.sort(key=lambda x: x[1], reverse=True)
return fused_results
def weighted_score_fusion(
dense_results: List[Tuple[int, float]],
sparse_results: List[Tuple[int, float]],
dense_weight: float = 0.7,
normalize: bool = True,
) -> List[Tuple[int, float]]:
"""
Alternative fusion using direct score weighting (not rank-based).
Score Formula: final_score = dense_weight * dense_score + sparse_weight * sparse_score
Args:
dense_results: [(chunk_idx, similarity_score), ...]
sparse_results: [(chunk_idx, bm25_score), ...]
dense_weight: Weight for semantic scores (default 0.7)
normalize: Whether to normalize scores to [0,1] range
Returns:
Fused results sorted by weighted score
Note: Normalizes input scores by default for fair weighting
"""
if not 0 <= dense_weight <= 1:
raise ValueError("dense_weight must be between 0 and 1")
sparse_weight = 1.0 - dense_weight
# Normalize scores if requested
if normalize and dense_results:
max_dense = max(score for _, score in dense_results) if dense_results else 1.0
min_dense = min(score for _, score in dense_results) if dense_results else 0.0
range_dense = max_dense - min_dense if max_dense > min_dense else 1.0
dense_results = [(idx, (score - min_dense) / range_dense) for idx, score in dense_results]
if normalize and sparse_results:
max_sparse = max(score for _, score in sparse_results) if sparse_results else 1.0
min_sparse = min(score for _, score in sparse_results) if sparse_results else 0.0
range_sparse = max_sparse - min_sparse if max_sparse > min_sparse else 1.0
sparse_results = [(idx, (score - min_sparse) / range_sparse) for idx, score in sparse_results]
# Convert to dictionaries for efficient lookup
dense_scores = dict(dense_results)
sparse_scores = dict(sparse_results)
# Get all unique document IDs
all_docs = set(dense_scores.keys()) | set(sparse_scores.keys())
# Calculate weighted scores
weighted_results = []
for doc_id in all_docs:
dense_score = dense_scores.get(doc_id, 0.0)
sparse_score = sparse_scores.get(doc_id, 0.0)
final_score = dense_weight * dense_score + sparse_weight * sparse_score
weighted_results.append((doc_id, final_score))
# Sort by final score (descending)
weighted_results.sort(key=lambda x: x[1], reverse=True)
return weighted_results
def adaptive_fusion(
dense_results: List[Tuple[int, float]],
sparse_results: List[Tuple[int, float]],
dense_weight: float = 0.7,
result_size: int = 10,
) -> List[Tuple[int, float]]:
"""
Adaptive fusion that chooses between RRF and weighted fusion based on result set size.
For small result sets (<=20), uses weighted fusion to preserve score variation.
For larger sets, uses RRF for better handling of different score scales.
Args:
dense_results: [(chunk_idx, similarity_score), ...]
sparse_results: [(chunk_idx, bm25_score), ...]
dense_weight: Weight for semantic scores (default 0.7)
result_size: Expected final result size for adaptive k selection
Returns:
Fused results with preserved score variation
"""
total_results = len(set(idx for idx, _ in dense_results) | set(idx for idx, _ in sparse_results))
if total_results <= 20:
# For small result sets, use weighted fusion to preserve score variation
return weighted_score_fusion(dense_results, sparse_results, dense_weight, normalize=True)
else:
# For larger sets, use RRF with adaptive k
# Smaller k for larger result sets, larger k for smaller sets
adaptive_k = max(5, min(60, result_size * 3))
return reciprocal_rank_fusion(dense_results, sparse_results, dense_weight, k=adaptive_k)