Spaces:
Sleeping
Sleeping
File size: 8,968 Bytes
5e1a30c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
"""
Base interfaces for embedder sub-components.
This module defines the abstract interfaces for all embedder sub-components
following the architecture specification in COMPONENT-3-EMBEDDER.md.
Interfaces are derived from rag-interface-reference.md section 3.2.
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, Tuple
import numpy as np
import sys
from pathlib import Path
# Add project root to path for imports
project_root = Path(__file__).parent.parent.parent.parent
sys.path.append(str(project_root))
class EmbeddingModel(ABC):
"""
Core embedding generation interface.
This interface defines the contract for all embedding model implementations,
whether they are direct implementations (for local models) or adapters
(for external APIs like OpenAI, Cohere).
Architecture Pattern:
- Direct Implementation: Local models (SentenceTransformers)
- Adapter Pattern: External APIs (OpenAI, Cohere)
"""
@abstractmethod
def encode(self, texts: List[str]) -> np.ndarray:
"""
Encode texts to embeddings.
Args:
texts: List of text strings to embed
Returns:
numpy array of shape (len(texts), embedding_dim)
Raises:
ValueError: If texts list is empty
RuntimeError: If encoding fails
"""
pass
@abstractmethod
def get_model_name(self) -> str:
"""
Return model identifier.
Returns:
String identifier for the embedding model
"""
pass
@abstractmethod
def get_embedding_dim(self) -> int:
"""
Return embedding dimension.
Returns:
Integer dimension of embeddings produced by this model
"""
pass
@abstractmethod
def get_max_sequence_length(self) -> int:
"""
Return maximum sequence length supported by the model.
Returns:
Maximum number of tokens/characters the model can process
"""
pass
@abstractmethod
def is_available(self) -> bool:
"""
Check if the model is available and ready for use.
Returns:
True if model is loaded and ready, False otherwise
"""
pass
class BatchProcessor(ABC):
"""
Optimize batch processing interface.
This interface defines batch processing strategies for efficient
embedding generation. All implementations use direct pattern since
they contain pure optimization algorithms without external dependencies.
Architecture Pattern: Direct Implementation (pure algorithms)
"""
@abstractmethod
def process_batch(self, texts: List[str], batch_size: int) -> np.ndarray:
"""
Process texts in batches for optimal throughput.
Args:
texts: List of text strings to process
batch_size: Size of each processing batch
Returns:
numpy array of embeddings with shape (len(texts), embedding_dim)
Raises:
ValueError: If batch_size is invalid
RuntimeError: If batch processing fails
"""
pass
@abstractmethod
def optimize_batch_size(self, sample_texts: List[str]) -> int:
"""
Determine optimal batch size based on sample texts and hardware.
Args:
sample_texts: Representative sample of texts to analyze
Returns:
Optimal batch size for the given hardware and text characteristics
"""
pass
@abstractmethod
def get_batch_stats(self) -> Dict[str, Any]:
"""
Get statistics about batch processing performance.
Returns:
Dictionary with metrics like average batch size, processing time, etc.
"""
pass
@abstractmethod
def supports_streaming(self) -> bool:
"""
Check if this processor supports streaming/incremental processing.
Returns:
True if streaming is supported, False otherwise
"""
pass
class EmbeddingCache(ABC):
"""
Cache computed embeddings interface.
This interface defines caching strategies for avoiding recomputation
of embeddings. Implementations follow different patterns based on
the storage backend.
Architecture Patterns:
- Direct Implementation: In-memory cache
- Adapter Pattern: External stores (Redis, disk)
"""
@abstractmethod
def get(self, text: str) -> Optional[np.ndarray]:
"""
Retrieve cached embedding for text.
Args:
text: Text string to look up
Returns:
Cached embedding array or None if not found
Raises:
RuntimeError: If cache retrieval fails
"""
pass
@abstractmethod
def put(self, text: str, embedding: np.ndarray) -> None:
"""
Store embedding in cache.
Args:
text: Text string as cache key
embedding: Embedding array to store
Raises:
ValueError: If text or embedding is invalid
RuntimeError: If cache storage fails
"""
pass
@abstractmethod
def invalidate(self, pattern: str) -> int:
"""
Invalidate cache entries matching pattern.
Args:
pattern: Pattern to match for invalidation (supports wildcards)
Returns:
Number of entries invalidated
Raises:
RuntimeError: If invalidation fails
"""
pass
@abstractmethod
def get_cache_stats(self) -> Dict[str, Any]:
"""
Get cache performance statistics.
Returns:
Dictionary with hit rate, size, evictions, etc.
"""
pass
@abstractmethod
def clear(self) -> None:
"""
Clear all entries from the cache.
Raises:
RuntimeError: If cache clearing fails
"""
pass
@abstractmethod
def get_cache_size(self) -> int:
"""
Get current number of cached entries.
Returns:
Number of entries currently in cache
"""
pass
class ConfigurableEmbedderComponent(ABC):
"""
Base class for configurable embedder components.
This provides common configuration and validation functionality
that all embedder sub-components can inherit.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize component with configuration.
Args:
config: Component-specific configuration dictionary
"""
self.config = config
self._validate_config()
@abstractmethod
def _validate_config(self) -> None:
"""
Validate component configuration.
Raises:
ValueError: If configuration is invalid
"""
pass
def get_config(self) -> Dict[str, Any]:
"""
Get current configuration.
Returns:
Configuration dictionary
"""
return self.config.copy()
def update_config(self, updates: Dict[str, Any]) -> None:
"""
Update component configuration.
Args:
updates: Configuration updates to apply
Raises:
ValueError: If updated configuration is invalid
"""
old_config = self.config.copy()
self.config.update(updates)
try:
self._validate_config()
except ValueError:
# Restore old configuration if validation fails
self.config = old_config
raise
# Component validation result for health checks
class ComponentValidationResult:
"""Result of component validation/health check."""
def __init__(self, is_valid: bool, message: str, details: Optional[Dict[str, Any]] = None):
"""
Initialize validation result.
Args:
is_valid: Whether component passed validation
message: Human-readable validation message
details: Optional additional validation details
"""
self.is_valid = is_valid
self.message = message
self.details = details or {}
def __bool__(self) -> bool:
"""Allow boolean evaluation of validation result."""
return self.is_valid
def __str__(self) -> str:
"""String representation of validation result."""
status = "VALID" if self.is_valid else "INVALID"
return f"{status}: {self.message}" |