Spaces:
Running
Running
""" | |
Weighted Fusion implementation for Modular Retriever Architecture. | |
This module provides a direct implementation of score-based weighted fusion | |
as an alternative to RRF for improved modularity and flexibility. | |
""" | |
import logging | |
from typing import List, Dict, Any, Tuple | |
from .base import FusionStrategy | |
logger = logging.getLogger(__name__) | |
class WeightedFusion(FusionStrategy): | |
""" | |
Weighted score fusion implementation. | |
This is a direct implementation of score-based fusion that combines | |
dense and sparse retrieval results using direct score weighting. | |
No external dependencies are required. | |
Score Formula: final_score = dense_weight * dense_score + sparse_weight * sparse_score | |
Features: | |
- Direct score weighting (not rank-based) | |
- Optional score normalization | |
- Configurable weights for dense and sparse retrieval | |
- Handles empty result sets gracefully | |
- Preserves original score information | |
Example: | |
config = { | |
"weights": { | |
"dense": 0.7, | |
"sparse": 0.3 | |
}, | |
"normalize": True | |
} | |
fusion = WeightedFusion(config) | |
results = fusion.fuse_results(dense_results, sparse_results) | |
""" | |
def __init__(self, config: Dict[str, Any]): | |
""" | |
Initialize weighted fusion strategy. | |
Args: | |
config: Configuration dictionary with: | |
- weights: Dictionary with dense and sparse weights | |
- dense: Weight for dense retrieval (default: 0.7) | |
- sparse: Weight for sparse retrieval (default: 0.3) | |
- normalize: Whether to normalize scores to [0,1] range (default: True) | |
""" | |
self.config = config | |
# Extract weights | |
weights = config.get("weights", {}) | |
self.dense_weight = weights.get("dense", 0.7) | |
self.sparse_weight = weights.get("sparse", 0.3) | |
self.normalize = config.get("normalize", True) | |
# Validation | |
if not 0 <= self.dense_weight <= 1: | |
raise ValueError("dense_weight must be between 0 and 1") | |
if not 0 <= self.sparse_weight <= 1: | |
raise ValueError("sparse_weight must be between 0 and 1") | |
# Normalize weights if they don't sum to 1 | |
weight_sum = self.dense_weight + self.sparse_weight | |
if weight_sum > 0: | |
self.dense_weight /= weight_sum | |
self.sparse_weight /= weight_sum | |
else: | |
self.dense_weight = 0.7 | |
self.sparse_weight = 0.3 | |
logger.info(f"WeightedFusion initialized with dense_weight={self.dense_weight:.3f}, normalize={self.normalize}") | |
def fuse_results( | |
self, | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]] | |
) -> List[Tuple[int, float]]: | |
""" | |
Fuse dense and sparse retrieval results using weighted scoring. | |
Args: | |
dense_results: List of (document_index, score) from dense retrieval | |
sparse_results: List of (document_index, score) from sparse retrieval | |
Returns: | |
List of (document_index, fused_score) tuples sorted by score | |
""" | |
# Handle empty results | |
if not dense_results and not sparse_results: | |
return [] | |
if not dense_results: | |
return sparse_results[:] if sparse_results else [] | |
if not sparse_results: | |
return dense_results[:] if dense_results else [] | |
# Normalize scores if requested | |
normalized_dense = self._normalize_scores(dense_results) if self.normalize else dense_results | |
normalized_sparse = self._normalize_scores(sparse_results) if self.normalize else sparse_results | |
# Convert to dictionaries for efficient lookup | |
dense_scores = dict(normalized_dense) | |
sparse_scores = dict(normalized_sparse) | |
# Get all unique document IDs | |
all_docs = set(dense_scores.keys()) | set(sparse_scores.keys()) | |
# Calculate weighted scores | |
weighted_results = [] | |
for doc_id in all_docs: | |
dense_score = dense_scores.get(doc_id, 0.0) | |
sparse_score = sparse_scores.get(doc_id, 0.0) | |
final_score = self.dense_weight * dense_score + self.sparse_weight * sparse_score | |
weighted_results.append((doc_id, final_score)) | |
# Sort by final score (descending) | |
weighted_results.sort(key=lambda x: x[1], reverse=True) | |
return weighted_results | |
def get_strategy_info(self) -> Dict[str, Any]: | |
""" | |
Get information about the weighted fusion strategy. | |
Returns: | |
Dictionary with strategy configuration and statistics | |
""" | |
return { | |
"algorithm": "weighted_score_fusion", | |
"dense_weight": self.dense_weight, | |
"sparse_weight": self.sparse_weight, | |
"normalize": self.normalize, | |
"parameters": { | |
"weights": { | |
"dense": self.dense_weight, | |
"sparse": self.sparse_weight | |
}, | |
"normalize": self.normalize | |
} | |
} | |
def _normalize_scores(self, results: List[Tuple[int, float]]) -> List[Tuple[int, float]]: | |
""" | |
Normalize scores to [0,1] range. | |
Args: | |
results: List of (document_index, score) tuples | |
Returns: | |
List of (document_index, normalized_score) tuples | |
""" | |
if not results: | |
return [] | |
scores = [score for _, score in results] | |
max_score = max(scores) | |
min_score = min(scores) | |
score_range = max_score - min_score | |
if score_range == 0: | |
# All scores are the same, return as-is | |
return results | |
# Normalize to [0,1] range | |
normalized_results = [ | |
(doc_id, (score - min_score) / score_range) | |
for doc_id, score in results | |
] | |
return normalized_results | |
def update_weights(self, dense_weight: float, sparse_weight: float) -> None: | |
""" | |
Update fusion weights dynamically. | |
Args: | |
dense_weight: New weight for dense retrieval | |
sparse_weight: New weight for sparse retrieval | |
""" | |
if not 0 <= dense_weight <= 1: | |
raise ValueError("dense_weight must be between 0 and 1") | |
if not 0 <= sparse_weight <= 1: | |
raise ValueError("sparse_weight must be between 0 and 1") | |
# Normalize weights | |
weight_sum = dense_weight + sparse_weight | |
if weight_sum > 0: | |
self.dense_weight = dense_weight / weight_sum | |
self.sparse_weight = sparse_weight / weight_sum | |
else: | |
raise ValueError("At least one weight must be positive") | |
logger.info(f"Updated weighted fusion weights: dense={self.dense_weight:.3f}, sparse={self.sparse_weight:.3f}") | |
def set_normalize(self, normalize: bool) -> None: | |
""" | |
Update the normalization setting. | |
Args: | |
normalize: Whether to normalize scores | |
""" | |
self.normalize = normalize | |
logger.info(f"Updated normalization setting to {normalize}") | |
def calculate_individual_scores( | |
self, | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]] | |
) -> Dict[int, Dict[str, float]]: | |
""" | |
Calculate individual weighted scores for debugging purposes. | |
Args: | |
dense_results: List of (document_index, score) from dense retrieval | |
sparse_results: List of (document_index, score) from sparse retrieval | |
Returns: | |
Dictionary mapping document_index to individual score components | |
""" | |
# Normalize scores if requested | |
normalized_dense = self._normalize_scores(dense_results) if self.normalize else dense_results | |
normalized_sparse = self._normalize_scores(sparse_results) if self.normalize else sparse_results | |
dense_scores = dict(normalized_dense) | |
sparse_scores = dict(normalized_sparse) | |
all_docs = set(dense_scores.keys()) | set(sparse_scores.keys()) | |
scores = {} | |
for doc_id in all_docs: | |
dense_score = dense_scores.get(doc_id, 0.0) | |
sparse_score = sparse_scores.get(doc_id, 0.0) | |
weighted_dense = self.dense_weight * dense_score | |
weighted_sparse = self.sparse_weight * sparse_score | |
scores[doc_id] = { | |
"dense": dense_score, | |
"sparse": sparse_score, | |
"weighted_dense": weighted_dense, | |
"weighted_sparse": weighted_sparse, | |
"total": weighted_dense + weighted_sparse | |
} | |
return scores | |
def get_score_statistics( | |
self, | |
dense_results: List[Tuple[int, float]], | |
sparse_results: List[Tuple[int, float]] | |
) -> Dict[str, Any]: | |
""" | |
Get score statistics for analysis. | |
Args: | |
dense_results: List of (document_index, score) from dense retrieval | |
sparse_results: List of (document_index, score) from sparse retrieval | |
Returns: | |
Dictionary with score statistics | |
""" | |
stats = {} | |
if dense_results: | |
dense_scores = [score for _, score in dense_results] | |
stats["dense"] = { | |
"min": min(dense_scores), | |
"max": max(dense_scores), | |
"mean": sum(dense_scores) / len(dense_scores), | |
"count": len(dense_scores) | |
} | |
if sparse_results: | |
sparse_scores = [score for _, score in sparse_results] | |
stats["sparse"] = { | |
"min": min(sparse_scores), | |
"max": max(sparse_scores), | |
"mean": sum(sparse_scores) / len(sparse_scores), | |
"count": len(sparse_scores) | |
} | |
return stats |