Spaces:
Running
Running
File size: 6,301 Bytes
9f5e57c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
"""
Sparse Retrieval using BM25 algorithm for keyword-based document search.
Complements dense semantic search with exact term matching capabilities.
"""
from typing import List, Dict, Tuple, Optional
from rank_bm25 import BM25Okapi
import re
import time
import numpy as np
class BM25SparseRetriever:
"""
BM25-based sparse retrieval for technical documentation.
Optimized for technical terms, acronyms, and precise keyword matching
that complements semantic similarity search in hybrid RAG systems.
Performance: Handles 1000+ chunks efficiently with <100ms search times.
"""
def __init__(self, k1: float = 1.2, b: float = 0.75):
"""
BM25 retriever for technical documentation.
Args:
k1: Term frequency saturation parameter (1.2 typical)
Higher values increase influence of term frequency
b: Document length normalization factor (0.75 typical)
0.0 = no normalization, 1.0 = full normalization
Raises:
ValueError: If k1 or b parameters are invalid
"""
if k1 <= 0:
raise ValueError("k1 must be positive")
if not 0 <= b <= 1:
raise ValueError("b must be between 0 and 1")
self.k1 = k1
self.b = b
self.bm25: Optional[BM25Okapi] = None
self.corpus: List[str] = []
self.tokenized_corpus: List[List[str]] = []
self.chunk_mapping: List[int] = []
# Compile regex for technical term preservation
self._tech_pattern = re.compile(r'[a-zA-Z0-9][\w\-_.]*[a-zA-Z0-9]|[a-zA-Z0-9]')
self._punctuation_pattern = re.compile(r'[^\w\s\-_.]')
def _preprocess_text(self, text: str) -> List[str]:
"""
Preprocess text preserving technical terms and acronyms.
Args:
text: Raw text to tokenize
Returns:
List of preprocessed tokens
Performance: ~10K tokens/second
"""
if not text or not text.strip():
return []
# Convert to lowercase while preserving structure
text = text.lower()
# Remove punctuation except hyphens, underscores, periods in technical terms
text = self._punctuation_pattern.sub(' ', text)
# Extract technical terms (handles RISC-V, RV32I, ARM Cortex-M, etc.)
tokens = self._tech_pattern.findall(text)
# Filter out single characters and empty strings
tokens = [token for token in tokens if len(token) > 1]
return tokens
def index_documents(self, chunks: List[Dict]) -> None:
"""
Index chunk texts for BM25 search with performance monitoring.
Args:
chunks: List of chunk dictionaries with 'text' field
Raises:
ValueError: If chunks is empty or malformed
KeyError: If chunks missing required 'text' field
Performance: ~1000 chunks/second preprocessing + indexing
"""
if not chunks:
raise ValueError("Cannot index empty chunk list")
start_time = time.time()
# Extract and validate texts
try:
texts = [chunk['text'] for chunk in chunks]
except KeyError:
raise KeyError("All chunks must contain 'text' field")
# Preprocess all texts
self.corpus = texts
self.tokenized_corpus = [self._preprocess_text(text) for text in texts]
# Filter out empty tokenized texts and track mapping
valid_corpus = []
self.chunk_mapping = []
for i, tokens in enumerate(self.tokenized_corpus):
if tokens: # Only include non-empty tokenized texts
valid_corpus.append(tokens)
self.chunk_mapping.append(i)
if not valid_corpus:
raise ValueError("No valid text content found in chunks")
# Create BM25 index
self.bm25 = BM25Okapi(valid_corpus, k1=self.k1, b=self.b)
elapsed = time.time() - start_time
tokens_per_sec = sum(len(tokens) for tokens in self.tokenized_corpus) / elapsed
print(f"Indexed {len(chunks)} chunks ({len(valid_corpus)} valid) in {elapsed:.3f}s")
print(f"Processing rate: {tokens_per_sec:.1f} tokens/second")
def search(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]:
"""
Search for relevant chunks using BM25 with normalized scores.
Args:
query: Search query string
top_k: Maximum number of results to return
Returns:
List of (chunk_index, normalized_score) tuples sorted by relevance
Scores normalized to [0,1] range for fusion compatibility
Raises:
ValueError: If not indexed or invalid parameters
Performance: <100ms for 1000+ document corpus
"""
if self.bm25 is None:
raise ValueError("Must call index_documents() before searching")
if not query or not query.strip():
return []
if top_k <= 0:
raise ValueError("top_k must be positive")
# Preprocess query using same method as documents
query_tokens = self._preprocess_text(query)
if not query_tokens:
return []
# Get BM25 scores for all documents
scores = self.bm25.get_scores(query_tokens)
if len(scores) == 0:
return []
# Normalize scores to [0,1] range
max_score = np.max(scores)
if max_score > 0:
normalized_scores = scores / max_score
else:
normalized_scores = scores
# Create results with original chunk indices
results = [
(self.chunk_mapping[i], float(normalized_scores[i]))
for i in range(len(scores))
]
# Sort by score (descending) and return top_k
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k] |