Spaces:
Running
Running
""" | |
Reciprocal Rank Fusion for combining dense and sparse retrieval results. | |
Implements the RRF algorithm with configurable weighting. | |
""" | |
from typing import List, Tuple, Dict | |
from collections import defaultdict | |
def reciprocal_rank_fusion( | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]], | |
dense_weight: float = 0.7, | |
k: int = 60, | |
) -> List[Tuple[int, float]]: | |
""" | |
Combine dense and sparse retrieval using Reciprocal Rank Fusion. | |
RRF Formula: score = Σ weight_i / (k + rank_i) | |
Where rank_i is the 1-based rank of document in result list i | |
Args: | |
dense_results: [(chunk_idx, similarity_score), ...] sorted by relevance | |
sparse_results: [(chunk_idx, bm25_score), ...] sorted by relevance | |
dense_weight: Weight for semantic similarity (default 0.7) | |
k: RRF constant controlling rank influence (default 60) | |
Returns: | |
Fused results as [(chunk_idx, rrf_score), ...] sorted by combined score | |
Raises: | |
ValueError: If weights are invalid or results are malformed | |
Performance: O(n + m) where n, m are result list lengths | |
""" | |
if not 0 <= dense_weight <= 1: | |
raise ValueError("dense_weight must be between 0 and 1") | |
if k <= 0: | |
raise ValueError("k must be positive") | |
sparse_weight = 1.0 - dense_weight | |
# Handle empty results | |
if not dense_results and not sparse_results: | |
return [] | |
if not dense_results: | |
return sparse_results | |
if not sparse_results: | |
return dense_results | |
# Calculate RRF scores for each unique document | |
rrf_scores: Dict[int, float] = defaultdict(float) | |
# Add dense retrieval scores (rank-based) | |
for rank, (chunk_idx, _) in enumerate(dense_results, 1): | |
rrf_scores[chunk_idx] += dense_weight / (k + rank) | |
# Add sparse retrieval scores (rank-based) | |
for rank, (chunk_idx, _) in enumerate(sparse_results, 1): | |
rrf_scores[chunk_idx] += sparse_weight / (k + rank) | |
# Convert to sorted list | |
fused_results = [ | |
(chunk_idx, score) for chunk_idx, score in rrf_scores.items() | |
] | |
# Sort by RRF score (descending) | |
fused_results.sort(key=lambda x: x[1], reverse=True) | |
return fused_results | |
def weighted_score_fusion( | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]], | |
dense_weight: float = 0.7, | |
normalize: bool = True, | |
) -> List[Tuple[int, float]]: | |
""" | |
Alternative fusion using direct score weighting (not rank-based). | |
Score Formula: final_score = dense_weight * dense_score + sparse_weight * sparse_score | |
Args: | |
dense_results: [(chunk_idx, similarity_score), ...] | |
sparse_results: [(chunk_idx, bm25_score), ...] | |
dense_weight: Weight for semantic scores (default 0.7) | |
normalize: Whether to normalize scores to [0,1] range | |
Returns: | |
Fused results sorted by weighted score | |
Note: Normalizes input scores by default for fair weighting | |
""" | |
if not 0 <= dense_weight <= 1: | |
raise ValueError("dense_weight must be between 0 and 1") | |
sparse_weight = 1.0 - dense_weight | |
# Normalize scores if requested | |
if normalize and dense_results: | |
max_dense = max(score for _, score in dense_results) if dense_results else 1.0 | |
min_dense = min(score for _, score in dense_results) if dense_results else 0.0 | |
range_dense = max_dense - min_dense if max_dense > min_dense else 1.0 | |
dense_results = [(idx, (score - min_dense) / range_dense) for idx, score in dense_results] | |
if normalize and sparse_results: | |
max_sparse = max(score for _, score in sparse_results) if sparse_results else 1.0 | |
min_sparse = min(score for _, score in sparse_results) if sparse_results else 0.0 | |
range_sparse = max_sparse - min_sparse if max_sparse > min_sparse else 1.0 | |
sparse_results = [(idx, (score - min_sparse) / range_sparse) for idx, score in sparse_results] | |
# Convert to dictionaries for efficient lookup | |
dense_scores = dict(dense_results) | |
sparse_scores = dict(sparse_results) | |
# Get all unique document IDs | |
all_docs = set(dense_scores.keys()) | set(sparse_scores.keys()) | |
# Calculate weighted scores | |
weighted_results = [] | |
for doc_id in all_docs: | |
dense_score = dense_scores.get(doc_id, 0.0) | |
sparse_score = sparse_scores.get(doc_id, 0.0) | |
final_score = dense_weight * dense_score + sparse_weight * sparse_score | |
weighted_results.append((doc_id, final_score)) | |
# Sort by final score (descending) | |
weighted_results.sort(key=lambda x: x[1], reverse=True) | |
return weighted_results | |
def adaptive_fusion( | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]], | |
dense_weight: float = 0.7, | |
result_size: int = 10, | |
) -> List[Tuple[int, float]]: | |
""" | |
Adaptive fusion that chooses between RRF and weighted fusion based on result set size. | |
For small result sets (<=20), uses weighted fusion to preserve score variation. | |
For larger sets, uses RRF for better handling of different score scales. | |
Args: | |
dense_results: [(chunk_idx, similarity_score), ...] | |
sparse_results: [(chunk_idx, bm25_score), ...] | |
dense_weight: Weight for semantic scores (default 0.7) | |
result_size: Expected final result size for adaptive k selection | |
Returns: | |
Fused results with preserved score variation | |
""" | |
total_results = len(set(idx for idx, _ in dense_results) | set(idx for idx, _ in sparse_results)) | |
if total_results <= 20: | |
# For small result sets, use weighted fusion to preserve score variation | |
return weighted_score_fusion(dense_results, sparse_results, dense_weight, normalize=True) | |
else: | |
# For larger sets, use RRF with adaptive k | |
# Smaller k for larger result sets, larger k for smaller sets | |
adaptive_k = max(5, min(60, result_size * 3)) | |
return reciprocal_rank_fusion(dense_results, sparse_results, dense_weight, k=adaptive_k) |