Arthur Passuello
initial commit
5e1a30c
"""
Reciprocal Rank Fusion implementation for Modular Retriever Architecture.
This module provides a direct implementation of the RRF algorithm
extracted from the existing fusion system for improved modularity.
"""
import logging
from typing import List, Dict, Any, Tuple
from collections import defaultdict
from .base import FusionStrategy
logger = logging.getLogger(__name__)
class RRFFusion(FusionStrategy):
"""
Reciprocal Rank Fusion (RRF) implementation.
This is a direct implementation of the RRF algorithm that combines
dense and sparse retrieval results using rank-based scoring.
No external dependencies are required.
RRF Formula: score = Σ weight_i / (k + rank_i)
Where rank_i is the 1-based rank of document in result list i
Features:
- Configurable RRF constant (k) parameter
- Weighted fusion with dense/sparse weights
- Handles empty result sets gracefully
- Preserves rank-based scoring benefits
- Optimized for performance
Example:
config = {
"k": 60,
"weights": {
"dense": 0.7,
"sparse": 0.3
}
}
fusion = RRFFusion(config)
results = fusion.fuse_results(dense_results, sparse_results)
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize RRF fusion strategy.
Args:
config: Configuration dictionary with:
- k: RRF constant controlling rank influence (default: 60)
- weights: Dictionary with dense and sparse weights
- dense: Weight for dense retrieval (default: 0.7)
- sparse: Weight for sparse retrieval (default: 0.3)
"""
self.config = config
self.k = config.get("k", 60)
# Extract weights
weights = config.get("weights", {})
self.dense_weight = weights.get("dense", 0.7)
self.sparse_weight = weights.get("sparse", 0.3)
# Validation
if self.k <= 0:
raise ValueError("k must be positive")
if not 0 <= self.dense_weight <= 1:
raise ValueError("dense_weight must be between 0 and 1")
if not 0 <= self.sparse_weight <= 1:
raise ValueError("sparse_weight must be between 0 and 1")
# Normalize weights if they don't sum to 1
weight_sum = self.dense_weight + self.sparse_weight
if weight_sum > 0:
self.dense_weight /= weight_sum
self.sparse_weight /= weight_sum
else:
self.dense_weight = 0.7
self.sparse_weight = 0.3
logger.info(f"RRFFusion initialized with k={self.k}, dense_weight={self.dense_weight:.3f}")
def fuse_results(
self,
dense_results: List[Tuple[int, float]],
sparse_results: List[Tuple[int, float]]
) -> List[Tuple[int, float]]:
"""
Fuse dense and sparse retrieval results using RRF.
Args:
dense_results: List of (document_index, score) from dense retrieval
sparse_results: List of (document_index, score) from sparse retrieval
Returns:
List of (document_index, fused_score) tuples sorted by score
"""
# Handle empty results
if not dense_results and not sparse_results:
return []
if not dense_results:
return sparse_results[:] if sparse_results else []
if not sparse_results:
return dense_results[:] if dense_results else []
# Calculate RRF scores for each unique document
rrf_scores: Dict[int, float] = defaultdict(float)
# Add dense retrieval scores (rank-based)
for rank, (doc_idx, _) in enumerate(dense_results, 1):
rrf_scores[doc_idx] += self.dense_weight / (self.k + rank)
# Add sparse retrieval scores (rank-based)
for rank, (doc_idx, _) in enumerate(sparse_results, 1):
rrf_scores[doc_idx] += self.sparse_weight / (self.k + rank)
# Convert to sorted list
fused_results = [
(doc_idx, score) for doc_idx, score in rrf_scores.items()
]
# Sort by RRF score (descending)
fused_results.sort(key=lambda x: x[1], reverse=True)
return fused_results
def get_strategy_info(self) -> Dict[str, Any]:
"""
Get information about the RRF fusion strategy.
Returns:
Dictionary with strategy configuration and statistics
"""
return {
"algorithm": "reciprocal_rank_fusion",
"k": self.k,
"dense_weight": self.dense_weight,
"sparse_weight": self.sparse_weight,
"parameters": {
"k": self.k,
"weights": {
"dense": self.dense_weight,
"sparse": self.sparse_weight
}
}
}
def update_weights(self, dense_weight: float, sparse_weight: float) -> None:
"""
Update fusion weights dynamically.
Args:
dense_weight: New weight for dense retrieval
sparse_weight: New weight for sparse retrieval
"""
if not 0 <= dense_weight <= 1:
raise ValueError("dense_weight must be between 0 and 1")
if not 0 <= sparse_weight <= 1:
raise ValueError("sparse_weight must be between 0 and 1")
# Normalize weights
weight_sum = dense_weight + sparse_weight
if weight_sum > 0:
self.dense_weight = dense_weight / weight_sum
self.sparse_weight = sparse_weight / weight_sum
else:
raise ValueError("At least one weight must be positive")
logger.info(f"Updated RRF weights: dense={self.dense_weight:.3f}, sparse={self.sparse_weight:.3f}")
def calculate_individual_scores(
self,
dense_results: List[Tuple[int, float]],
sparse_results: List[Tuple[int, float]]
) -> Dict[int, Dict[str, float]]:
"""
Calculate individual RRF scores for debugging purposes.
Args:
dense_results: List of (document_index, score) from dense retrieval
sparse_results: List of (document_index, score) from sparse retrieval
Returns:
Dictionary mapping document_index to individual score components
"""
scores = defaultdict(lambda: {"dense": 0.0, "sparse": 0.0, "total": 0.0})
# Calculate dense scores
for rank, (doc_idx, _) in enumerate(dense_results, 1):
dense_score = self.dense_weight / (self.k + rank)
scores[doc_idx]["dense"] = dense_score
scores[doc_idx]["total"] += dense_score
# Calculate sparse scores
for rank, (doc_idx, _) in enumerate(sparse_results, 1):
sparse_score = self.sparse_weight / (self.k + rank)
scores[doc_idx]["sparse"] = sparse_score
scores[doc_idx]["total"] += sparse_score
return dict(scores)
def get_optimal_k(self, result_size: int) -> int:
"""
Get optimal k value based on result size.
Args:
result_size: Expected final result size
Returns:
Optimal k value
"""
# Adaptive k selection based on result size
# Smaller k for larger result sets, larger k for smaller sets
return max(5, min(60, result_size * 3))
def set_k(self, k: int) -> None:
"""
Update the RRF k parameter.
Args:
k: New RRF constant value
"""
if k <= 0:
raise ValueError("k must be positive")
self.k = k
logger.info(f"Updated RRF k parameter to {k}")
def get_rank_contributions(
self,
dense_results: List[Tuple[int, float]],
sparse_results: List[Tuple[int, float]]
) -> Dict[int, Dict[str, Any]]:
"""
Get detailed rank contributions for analysis.
Args:
dense_results: List of (document_index, score) from dense retrieval
sparse_results: List of (document_index, score) from sparse retrieval
Returns:
Dictionary with rank contributions for each document
"""
contributions = defaultdict(lambda: {
"dense_rank": None,
"sparse_rank": None,
"dense_contribution": 0.0,
"sparse_contribution": 0.0
})
# Track dense ranks and contributions
for rank, (doc_idx, _) in enumerate(dense_results, 1):
contributions[doc_idx]["dense_rank"] = rank
contributions[doc_idx]["dense_contribution"] = self.dense_weight / (self.k + rank)
# Track sparse ranks and contributions
for rank, (doc_idx, _) in enumerate(sparse_results, 1):
contributions[doc_idx]["sparse_rank"] = rank
contributions[doc_idx]["sparse_contribution"] = self.sparse_weight / (self.k + rank)
return dict(contributions)