Spaces:
Running
Running
""" | |
Reciprocal Rank Fusion implementation for Modular Retriever Architecture. | |
This module provides a direct implementation of the RRF algorithm | |
extracted from the existing fusion system for improved modularity. | |
""" | |
import logging | |
from typing import List, Dict, Any, Tuple | |
from collections import defaultdict | |
from .base import FusionStrategy | |
logger = logging.getLogger(__name__) | |
class RRFFusion(FusionStrategy): | |
""" | |
Reciprocal Rank Fusion (RRF) implementation. | |
This is a direct implementation of the RRF algorithm that combines | |
dense and sparse retrieval results using rank-based scoring. | |
No external dependencies are required. | |
RRF Formula: score = Σ weight_i / (k + rank_i) | |
Where rank_i is the 1-based rank of document in result list i | |
Features: | |
- Configurable RRF constant (k) parameter | |
- Weighted fusion with dense/sparse weights | |
- Handles empty result sets gracefully | |
- Preserves rank-based scoring benefits | |
- Optimized for performance | |
Example: | |
config = { | |
"k": 60, | |
"weights": { | |
"dense": 0.7, | |
"sparse": 0.3 | |
} | |
} | |
fusion = RRFFusion(config) | |
results = fusion.fuse_results(dense_results, sparse_results) | |
""" | |
def __init__(self, config: Dict[str, Any]): | |
""" | |
Initialize RRF fusion strategy. | |
Args: | |
config: Configuration dictionary with: | |
- k: RRF constant controlling rank influence (default: 60) | |
- weights: Dictionary with dense and sparse weights | |
- dense: Weight for dense retrieval (default: 0.7) | |
- sparse: Weight for sparse retrieval (default: 0.3) | |
""" | |
self.config = config | |
self.k = config.get("k", 60) | |
# Extract weights | |
weights = config.get("weights", {}) | |
self.dense_weight = weights.get("dense", 0.7) | |
self.sparse_weight = weights.get("sparse", 0.3) | |
# Validation | |
if self.k <= 0: | |
raise ValueError("k must be positive") | |
if not 0 <= self.dense_weight <= 1: | |
raise ValueError("dense_weight must be between 0 and 1") | |
if not 0 <= self.sparse_weight <= 1: | |
raise ValueError("sparse_weight must be between 0 and 1") | |
# Normalize weights if they don't sum to 1 | |
weight_sum = self.dense_weight + self.sparse_weight | |
if weight_sum > 0: | |
self.dense_weight /= weight_sum | |
self.sparse_weight /= weight_sum | |
else: | |
self.dense_weight = 0.7 | |
self.sparse_weight = 0.3 | |
logger.info(f"RRFFusion initialized with k={self.k}, dense_weight={self.dense_weight:.3f}") | |
def fuse_results( | |
self, | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]] | |
) -> List[Tuple[int, float]]: | |
""" | |
Fuse dense and sparse retrieval results using RRF. | |
Args: | |
dense_results: List of (document_index, score) from dense retrieval | |
sparse_results: List of (document_index, score) from sparse retrieval | |
Returns: | |
List of (document_index, fused_score) tuples sorted by score | |
""" | |
# Handle empty results | |
if not dense_results and not sparse_results: | |
return [] | |
if not dense_results: | |
return sparse_results[:] if sparse_results else [] | |
if not sparse_results: | |
return dense_results[:] if dense_results else [] | |
# Calculate RRF scores for each unique document | |
rrf_scores: Dict[int, float] = defaultdict(float) | |
# Add dense retrieval scores (rank-based) | |
for rank, (doc_idx, _) in enumerate(dense_results, 1): | |
rrf_scores[doc_idx] += self.dense_weight / (self.k + rank) | |
# Add sparse retrieval scores (rank-based) | |
for rank, (doc_idx, _) in enumerate(sparse_results, 1): | |
rrf_scores[doc_idx] += self.sparse_weight / (self.k + rank) | |
# Convert to sorted list | |
fused_results = [ | |
(doc_idx, score) for doc_idx, score in rrf_scores.items() | |
] | |
# Sort by RRF score (descending) | |
fused_results.sort(key=lambda x: x[1], reverse=True) | |
return fused_results | |
def get_strategy_info(self) -> Dict[str, Any]: | |
""" | |
Get information about the RRF fusion strategy. | |
Returns: | |
Dictionary with strategy configuration and statistics | |
""" | |
return { | |
"algorithm": "reciprocal_rank_fusion", | |
"k": self.k, | |
"dense_weight": self.dense_weight, | |
"sparse_weight": self.sparse_weight, | |
"parameters": { | |
"k": self.k, | |
"weights": { | |
"dense": self.dense_weight, | |
"sparse": self.sparse_weight | |
} | |
} | |
} | |
def update_weights(self, dense_weight: float, sparse_weight: float) -> None: | |
""" | |
Update fusion weights dynamically. | |
Args: | |
dense_weight: New weight for dense retrieval | |
sparse_weight: New weight for sparse retrieval | |
""" | |
if not 0 <= dense_weight <= 1: | |
raise ValueError("dense_weight must be between 0 and 1") | |
if not 0 <= sparse_weight <= 1: | |
raise ValueError("sparse_weight must be between 0 and 1") | |
# Normalize weights | |
weight_sum = dense_weight + sparse_weight | |
if weight_sum > 0: | |
self.dense_weight = dense_weight / weight_sum | |
self.sparse_weight = sparse_weight / weight_sum | |
else: | |
raise ValueError("At least one weight must be positive") | |
logger.info(f"Updated RRF weights: dense={self.dense_weight:.3f}, sparse={self.sparse_weight:.3f}") | |
def calculate_individual_scores( | |
self, | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]] | |
) -> Dict[int, Dict[str, float]]: | |
""" | |
Calculate individual RRF scores for debugging purposes. | |
Args: | |
dense_results: List of (document_index, score) from dense retrieval | |
sparse_results: List of (document_index, score) from sparse retrieval | |
Returns: | |
Dictionary mapping document_index to individual score components | |
""" | |
scores = defaultdict(lambda: {"dense": 0.0, "sparse": 0.0, "total": 0.0}) | |
# Calculate dense scores | |
for rank, (doc_idx, _) in enumerate(dense_results, 1): | |
dense_score = self.dense_weight / (self.k + rank) | |
scores[doc_idx]["dense"] = dense_score | |
scores[doc_idx]["total"] += dense_score | |
# Calculate sparse scores | |
for rank, (doc_idx, _) in enumerate(sparse_results, 1): | |
sparse_score = self.sparse_weight / (self.k + rank) | |
scores[doc_idx]["sparse"] = sparse_score | |
scores[doc_idx]["total"] += sparse_score | |
return dict(scores) | |
def get_optimal_k(self, result_size: int) -> int: | |
""" | |
Get optimal k value based on result size. | |
Args: | |
result_size: Expected final result size | |
Returns: | |
Optimal k value | |
""" | |
# Adaptive k selection based on result size | |
# Smaller k for larger result sets, larger k for smaller sets | |
return max(5, min(60, result_size * 3)) | |
def set_k(self, k: int) -> None: | |
""" | |
Update the RRF k parameter. | |
Args: | |
k: New RRF constant value | |
""" | |
if k <= 0: | |
raise ValueError("k must be positive") | |
self.k = k | |
logger.info(f"Updated RRF k parameter to {k}") | |
def get_rank_contributions( | |
self, | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]] | |
) -> Dict[int, Dict[str, Any]]: | |
""" | |
Get detailed rank contributions for analysis. | |
Args: | |
dense_results: List of (document_index, score) from dense retrieval | |
sparse_results: List of (document_index, score) from sparse retrieval | |
Returns: | |
Dictionary with rank contributions for each document | |
""" | |
contributions = defaultdict(lambda: { | |
"dense_rank": None, | |
"sparse_rank": None, | |
"dense_contribution": 0.0, | |
"sparse_contribution": 0.0 | |
}) | |
# Track dense ranks and contributions | |
for rank, (doc_idx, _) in enumerate(dense_results, 1): | |
contributions[doc_idx]["dense_rank"] = rank | |
contributions[doc_idx]["dense_contribution"] = self.dense_weight / (self.k + rank) | |
# Track sparse ranks and contributions | |
for rank, (doc_idx, _) in enumerate(sparse_results, 1): | |
contributions[doc_idx]["sparse_rank"] = rank | |
contributions[doc_idx]["sparse_contribution"] = self.sparse_weight / (self.k + rank) | |
return dict(contributions) |