File size: 6,239 Bytes
9f5e57c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
Reciprocal Rank Fusion for combining dense and sparse retrieval results.
Implements the RRF algorithm with configurable weighting.
"""

from typing import List, Tuple, Dict
from collections import defaultdict


def reciprocal_rank_fusion(
    dense_results: List[Tuple[int, float]],
    sparse_results: List[Tuple[int, float]],
    dense_weight: float = 0.7,
    k: int = 60,
) -> List[Tuple[int, float]]:
    """
    Combine dense and sparse retrieval using Reciprocal Rank Fusion.

    RRF Formula: score = Σ weight_i / (k + rank_i)
    Where rank_i is the 1-based rank of document in result list i

    Args:
        dense_results: [(chunk_idx, similarity_score), ...] sorted by relevance
        sparse_results: [(chunk_idx, bm25_score), ...] sorted by relevance  
        dense_weight: Weight for semantic similarity (default 0.7)
        k: RRF constant controlling rank influence (default 60)

    Returns:
        Fused results as [(chunk_idx, rrf_score), ...] sorted by combined score
        
    Raises:
        ValueError: If weights are invalid or results are malformed
        
    Performance: O(n + m) where n, m are result list lengths
    """
    if not 0 <= dense_weight <= 1:
        raise ValueError("dense_weight must be between 0 and 1")
        
    if k <= 0:
        raise ValueError("k must be positive")
        
    sparse_weight = 1.0 - dense_weight
    
    # Handle empty results
    if not dense_results and not sparse_results:
        return []
    if not dense_results:
        return sparse_results
    if not sparse_results:
        return dense_results
    
    # Calculate RRF scores for each unique document
    rrf_scores: Dict[int, float] = defaultdict(float)
    
    # Add dense retrieval scores (rank-based)
    for rank, (chunk_idx, _) in enumerate(dense_results, 1):
        rrf_scores[chunk_idx] += dense_weight / (k + rank)
    
    # Add sparse retrieval scores (rank-based)  
    for rank, (chunk_idx, _) in enumerate(sparse_results, 1):
        rrf_scores[chunk_idx] += sparse_weight / (k + rank)
    
    # Convert to sorted list
    fused_results = [
        (chunk_idx, score) for chunk_idx, score in rrf_scores.items()
    ]
    
    # Sort by RRF score (descending)
    fused_results.sort(key=lambda x: x[1], reverse=True)
    
    return fused_results


def weighted_score_fusion(
    dense_results: List[Tuple[int, float]],
    sparse_results: List[Tuple[int, float]], 
    dense_weight: float = 0.7,
    normalize: bool = True,
) -> List[Tuple[int, float]]:
    """
    Alternative fusion using direct score weighting (not rank-based).
    
    Score Formula: final_score = dense_weight * dense_score + sparse_weight * sparse_score
    
    Args:
        dense_results: [(chunk_idx, similarity_score), ...]
        sparse_results: [(chunk_idx, bm25_score), ...]  
        dense_weight: Weight for semantic scores (default 0.7)
        normalize: Whether to normalize scores to [0,1] range
        
    Returns:
        Fused results sorted by weighted score
        
    Note: Normalizes input scores by default for fair weighting
    """
    if not 0 <= dense_weight <= 1:
        raise ValueError("dense_weight must be between 0 and 1")
        
    sparse_weight = 1.0 - dense_weight
    
    # Normalize scores if requested
    if normalize and dense_results:
        max_dense = max(score for _, score in dense_results) if dense_results else 1.0
        min_dense = min(score for _, score in dense_results) if dense_results else 0.0
        range_dense = max_dense - min_dense if max_dense > min_dense else 1.0
        
        dense_results = [(idx, (score - min_dense) / range_dense) for idx, score in dense_results]
    
    if normalize and sparse_results:
        max_sparse = max(score for _, score in sparse_results) if sparse_results else 1.0
        min_sparse = min(score for _, score in sparse_results) if sparse_results else 0.0
        range_sparse = max_sparse - min_sparse if max_sparse > min_sparse else 1.0
        
        sparse_results = [(idx, (score - min_sparse) / range_sparse) for idx, score in sparse_results]
    
    # Convert to dictionaries for efficient lookup
    dense_scores = dict(dense_results)
    sparse_scores = dict(sparse_results)
    
    # Get all unique document IDs
    all_docs = set(dense_scores.keys()) | set(sparse_scores.keys())
    
    # Calculate weighted scores
    weighted_results = []
    for doc_id in all_docs:
        dense_score = dense_scores.get(doc_id, 0.0)
        sparse_score = sparse_scores.get(doc_id, 0.0)
        
        final_score = dense_weight * dense_score + sparse_weight * sparse_score
        weighted_results.append((doc_id, final_score))
    
    # Sort by final score (descending)
    weighted_results.sort(key=lambda x: x[1], reverse=True)
    
    return weighted_results


def adaptive_fusion(
    dense_results: List[Tuple[int, float]],
    sparse_results: List[Tuple[int, float]], 
    dense_weight: float = 0.7,
    result_size: int = 10,
) -> List[Tuple[int, float]]:
    """
    Adaptive fusion that chooses between RRF and weighted fusion based on result set size.
    
    For small result sets (<=20), uses weighted fusion to preserve score variation.
    For larger sets, uses RRF for better handling of different score scales.
    
    Args:
        dense_results: [(chunk_idx, similarity_score), ...]
        sparse_results: [(chunk_idx, bm25_score), ...]  
        dense_weight: Weight for semantic scores (default 0.7)
        result_size: Expected final result size for adaptive k selection
        
    Returns:
        Fused results with preserved score variation
    """
    total_results = len(set(idx for idx, _ in dense_results) | set(idx for idx, _ in sparse_results))
    
    if total_results <= 20:
        # For small result sets, use weighted fusion to preserve score variation
        return weighted_score_fusion(dense_results, sparse_results, dense_weight, normalize=True)
    else:
        # For larger sets, use RRF with adaptive k
        # Smaller k for larger result sets, larger k for smaller sets
        adaptive_k = max(5, min(60, result_size * 3))
        return reciprocal_rank_fusion(dense_results, sparse_results, dense_weight, k=adaptive_k)