Arthur Passuello
initial commit
5e1a30c
"""
Modular Embedder Implementation.
This module implements the primary Embedder interface that coordinates
all embedding sub-components (model, batch processor, cache) through
a configurable architecture following the ModularDocumentProcessor pattern.
Architecture Notes:
- Implements Embedder interface from core.interfaces
- Coordinates sub-components via configuration-driven selection
- Follows adapter vs direct implementation patterns per specification
- Provides unified interface for embedding generation
- Includes comprehensive error handling and metrics
"""
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
import numpy as np
import logging
import sys
# Add project paths for imports
project_root = Path(__file__).parent.parent.parent.parent
sys.path.append(str(project_root))
from src.core.interfaces import Embedder as EmbedderInterface, HealthStatus
# Forward declaration to avoid circular import
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.core.platform_orchestrator import PlatformOrchestrator
from .base import (
EmbeddingModel,
BatchProcessor,
EmbeddingCache,
ConfigurableEmbedderComponent,
ComponentValidationResult
)
# Import sub-component implementations
from .models.sentence_transformer_model import SentenceTransformerModel
from .batch_processors.dynamic_batch_processor import DynamicBatchProcessor
from .caches.memory_cache import MemoryCache
logger = logging.getLogger(__name__)
class ModularEmbedder(EmbedderInterface, ConfigurableEmbedderComponent):
"""
Modular embedder with configurable sub-components.
This embedder implements the Embedder interface while providing
a modular architecture where embedding model, batch processing, and
caching strategies can be configured independently.
Features:
- Configuration-driven sub-component selection
- Multiple embedding provider support (extensible)
- Comprehensive error handling and validation
- Performance metrics and monitoring
- Pluggable sub-component architecture
Configuration Structure:
{
"model": {
"type": "sentence_transformer", # or "openai", "cohere"
"config": {
"model_name": "all-MiniLM-L6-v2",
"device": "mps",
"normalize_embeddings": true
}
},
"batch_processor": {
"type": "dynamic",
"config": {
"initial_batch_size": 32,
"max_batch_size": 128,
"optimize_for_memory": true
}
},
"cache": {
"type": "memory", # or "redis", "disk"
"config": {
"max_entries": 100000,
"max_memory_mb": 1024
}
}
}
Architecture Compliance:
- EmbeddingModel: Mixed (Direct for local, Adapter for APIs)
- BatchProcessor: Direct implementation (pure algorithms)
- EmbeddingCache: Mixed (Direct for memory, Adapter for external stores)
"""
# Sub-component type mappings
_MODEL_TYPES = {
"sentence_transformer": SentenceTransformerModel,
# Future: "openai": OpenAIEmbeddingAdapter,
# Future: "cohere": CohereEmbeddingAdapter,
}
_BATCH_PROCESSOR_TYPES = {
"dynamic": DynamicBatchProcessor,
# Future: "fixed": FixedBatchProcessor,
# Future: "streaming": StreamingBatchProcessor,
}
_CACHE_TYPES = {
"memory": MemoryCache,
# Future: "redis": RedisCacheAdapter,
# Future: "disk": DiskCacheAdapter,
}
def __init__(self, config: Dict[str, Any] = None, **kwargs):
"""
Initialize modular embedder with sub-components.
Args:
config: Embedder configuration dictionary
**kwargs: Alternative configuration parameters (for backward compatibility)
"""
# Handle configuration - prioritize explicit config, fallback to kwargs
if config is None:
config = kwargs
super().__init__(config)
# Initialize sub-components in dependency order
self.model = self._create_model()
self.cache = self._create_cache()
self.batch_processor = self._create_batch_processor() # Needs model reference
# Performance tracking
self._total_embeddings_generated = 0
self._total_processing_time = 0.0
self._cache_hits = 0
self._cache_misses = 0
self._created_time = time.time()
# Platform services (initialized via initialize_services)
self.platform: Optional['PlatformOrchestrator'] = None
# Validate complete system
validation_result = self.validate_components()
if not validation_result.is_valid:
raise RuntimeError(f"Component validation failed: {validation_result.message}")
logger.info(f"ModularEmbedder initialized successfully with sub-components: "
f"model={self.model.__class__.__name__}, "
f"batch_processor={self.batch_processor.__class__.__name__}, "
f"cache={self.cache.__class__.__name__}")
def _validate_config(self) -> None:
"""
Validate embedder configuration.
Raises:
ValueError: If configuration is invalid
"""
required_sections = ["model", "batch_processor", "cache"]
for section in required_sections:
if section not in self.config:
raise ValueError(f"Missing required configuration section: {section}")
# Validate each sub-component config
for section in required_sections:
section_config = self.config[section]
if "type" not in section_config:
raise ValueError(f"Missing 'type' in {section} configuration")
if "config" not in section_config:
raise ValueError(f"Missing 'config' in {section} configuration")
def _create_model(self) -> EmbeddingModel:
"""
Create embedding model sub-component.
Returns:
Configured EmbeddingModel instance
Raises:
ValueError: If model type is not supported
"""
model_config = self.config["model"]
model_type = model_config["type"]
if model_type not in self._MODEL_TYPES:
available_types = list(self._MODEL_TYPES.keys())
raise ValueError(f"Unsupported model type '{model_type}'. Available: {available_types}")
model_class = self._MODEL_TYPES[model_type]
model_instance = model_class(model_config["config"])
logger.debug(f"Created embedding model: {model_type} -> {model_class.__name__}")
return model_instance
def _create_batch_processor(self) -> BatchProcessor:
"""
Create batch processor sub-component.
Returns:
Configured BatchProcessor instance
Raises:
ValueError: If batch processor type is not supported
"""
batch_config = self.config["batch_processor"]
batch_type = batch_config["type"]
if batch_type not in self._BATCH_PROCESSOR_TYPES:
available_types = list(self._BATCH_PROCESSOR_TYPES.keys())
raise ValueError(f"Unsupported batch processor type '{batch_type}'. Available: {available_types}")
batch_class = self._BATCH_PROCESSOR_TYPES[batch_type]
# BatchProcessor needs reference to the embedding model
batch_instance = batch_class(batch_config["config"], self.model)
logger.debug(f"Created batch processor: {batch_type} -> {batch_class.__name__}")
return batch_instance
def _create_cache(self) -> EmbeddingCache:
"""
Create embedding cache sub-component.
Returns:
Configured EmbeddingCache instance
Raises:
ValueError: If cache type is not supported
"""
cache_config = self.config["cache"]
cache_type = cache_config["type"]
if cache_type not in self._CACHE_TYPES:
available_types = list(self._CACHE_TYPES.keys())
raise ValueError(f"Unsupported cache type '{cache_type}'. Available: {available_types}")
cache_class = self._CACHE_TYPES[cache_type]
cache_instance = cache_class(cache_config["config"])
logger.debug(f"Created embedding cache: {cache_type} -> {cache_class.__name__}")
return cache_instance
def embed(self, texts: List[str]) -> List[List[float]]:
"""
Generate embeddings for a list of texts using the modular architecture.
This method coordinates all sub-components:
1. Check cache for existing embeddings
2. Use batch processor for optimal throughput on cache misses
3. Store new embeddings in cache
4. Return combined results
Args:
texts: List of text strings to embed
Returns:
List of embedding vectors, where each vector is a list of floats
Raises:
ValueError: If texts list is empty
RuntimeError: If embedding generation fails
"""
if not texts:
raise ValueError("Cannot generate embeddings for empty text list")
start_time = time.time()
try:
# Step 1: Check cache for existing embeddings
cached_embeddings = {}
texts_to_compute = []
for i, text in enumerate(texts):
cached_embedding = self.cache.get(text)
if cached_embedding is not None:
cached_embeddings[i] = cached_embedding
self._cache_hits += 1
else:
texts_to_compute.append((i, text))
self._cache_misses += 1
# Step 2: Generate embeddings for cache misses using batch processor
new_embeddings = {}
if texts_to_compute:
texts_for_processing = [text for _, text in texts_to_compute]
# Use batch processor for optimal throughput
processed_embeddings = self.batch_processor.process_batch(
texts_for_processing,
batch_size=32 # Will be optimized by batch processor
)
# Step 3: Store new embeddings in cache and collect results
for j, (original_index, text) in enumerate(texts_to_compute):
embedding = processed_embeddings[j]
# Store in cache
self.cache.put(text, embedding)
# Store for result assembly
new_embeddings[original_index] = embedding
# Step 4: Assemble final results in original order
result_embeddings = []
for i in range(len(texts)):
if i in cached_embeddings:
embedding = cached_embeddings[i]
else:
embedding = new_embeddings[i]
# Convert to list format as required by interface
result_embeddings.append(embedding.tolist())
# Update performance statistics
processing_time = time.time() - start_time
self._total_embeddings_generated += len(texts)
self._total_processing_time += processing_time
# Track performance using platform services
if self.platform:
self.platform.track_component_performance(
self,
"embedding_generation",
{
"success": True,
"processing_time": processing_time,
"texts_count": len(texts),
"cache_hits": len(cached_embeddings),
"new_embeddings": len(new_embeddings),
"embedding_dimension": self.embedding_dim()
}
)
logger.debug(f"Generated {len(texts)} embeddings in {processing_time:.3f}s "
f"(cache hits: {len(cached_embeddings)}, computed: {len(new_embeddings)})")
return result_embeddings
except Exception as e:
# Track failure using platform services
if self.platform:
processing_time = time.time() - start_time
self.platform.track_component_performance(
self,
"embedding_generation",
{
"success": False,
"processing_time": processing_time,
"texts_count": len(texts),
"error": str(e)
}
)
logger.error(f"Embedding generation failed: {e}")
raise RuntimeError(f"Failed to generate embeddings: {str(e)}") from e
def embedding_dim(self) -> int:
"""
Get the embedding dimension.
Returns:
Integer dimension of embeddings
"""
return self.model.get_embedding_dim()
def get_model_info(self) -> Dict[str, Any]:
"""
Get comprehensive information about the embedder and its sub-components.
Returns:
Dictionary with embedder configuration and status
"""
return {
"component_type": "modular_embedder",
"embedding_dimension": self.embedding_dim(),
"model": {
"type": self.config["model"]["type"],
"info": self.model.get_model_info() if hasattr(self.model, 'get_model_info') else {}
},
"batch_processor": {
"type": self.config["batch_processor"]["type"],
"stats": self.batch_processor.get_batch_stats()
},
"cache": {
"type": self.config["cache"]["type"],
"stats": self.cache.get_cache_stats()
},
"performance": self.get_performance_stats(),
"uptime_seconds": time.time() - self._created_time
}
def get_performance_stats(self) -> Dict[str, Any]:
"""
Get performance statistics for the embedder.
Returns:
Dictionary with performance metrics
"""
total_requests = self._cache_hits + self._cache_misses
cache_hit_rate = self._cache_hits / total_requests if total_requests > 0 else 0.0
avg_throughput = (
self._total_embeddings_generated / self._total_processing_time
if self._total_processing_time > 0 else 0.0
)
return {
"total_embeddings_generated": self._total_embeddings_generated,
"total_processing_time": self._total_processing_time,
"average_throughput": avg_throughput,
"cache_hits": self._cache_hits,
"cache_misses": self._cache_misses,
"cache_hit_rate": cache_hit_rate,
"uptime_seconds": time.time() - self._created_time
}
def supports_batching(self) -> bool:
"""
Check if this embedder supports batch processing.
Returns:
True, as this implementation supports efficient batch processing
"""
return True
def validate_components(self) -> ComponentValidationResult:
"""
Validate all sub-components are properly configured and functional.
Returns:
ComponentValidationResult with validation status
"""
try:
# Test model
if not self.model.is_available():
return ComponentValidationResult(
False,
"Embedding model is not available",
{"model_type": self.config["model"]["type"]}
)
# Test model with dummy data
try:
test_embedding = self.model.encode(["test"])
if test_embedding.size == 0:
return ComponentValidationResult(
False,
"Model produced empty embedding",
{"model_type": self.config["model"]["type"]}
)
except Exception as e:
return ComponentValidationResult(
False,
f"Model encoding test failed: {e}",
{"model_type": self.config["model"]["type"]}
)
# Test cache
try:
# Test cache operations
test_embedding = np.array([1.0, 2.0, 3.0])
self.cache.put("test_key", test_embedding)
retrieved = self.cache.get("test_key")
if retrieved is None or not np.array_equal(retrieved, test_embedding):
return ComponentValidationResult(
False,
"Cache put/get operations failed",
{"cache_type": self.config["cache"]["type"]}
)
# Clean up test data
self.cache.invalidate("test_key")
except Exception as e:
return ComponentValidationResult(
False,
f"Cache operations test failed: {e}",
{"cache_type": self.config["cache"]["type"]}
)
# Test batch processor
if not hasattr(self.batch_processor, 'process_batch'):
return ComponentValidationResult(
False,
"Batch processor missing required methods",
{"batch_processor_type": self.config["batch_processor"]["type"]}
)
return ComponentValidationResult(
True,
"All components validated successfully",
{
"model_type": self.config["model"]["type"],
"batch_processor_type": self.config["batch_processor"]["type"],
"cache_type": self.config["cache"]["type"],
"embedding_dimension": self.embedding_dim()
}
)
except Exception as e:
return ComponentValidationResult(
False,
f"Component validation failed with error: {e}",
{"error_type": type(e).__name__}
)
def get_sub_components(self) -> Dict[str, Any]:
"""
Get information about all sub-components for factory logging.
Returns:
Dictionary with sub-component details
"""
return {
"components": {
"model": {
"type": self.config["model"]["type"],
"class": self.model.__class__.__name__,
"available": self.model.is_available()
},
"batch_processor": {
"type": self.config["batch_processor"]["type"],
"class": self.batch_processor.__class__.__name__,
"supports_streaming": self.batch_processor.supports_streaming()
},
"cache": {
"type": self.config["cache"]["type"],
"class": self.cache.__class__.__name__,
"size": self.cache.get_cache_size()
}
},
"architecture": "modular_embedder",
"total_sub_components": 3
}
# Standard ComponentBase interface implementation
def initialize_services(self, platform: 'PlatformOrchestrator') -> None:
"""Initialize platform services for the component.
Args:
platform: PlatformOrchestrator instance providing services
"""
self.platform = platform
logger.info("ModularEmbedder initialized with platform services")
def get_health_status(self) -> HealthStatus:
"""Get the current health status of the component.
Returns:
HealthStatus object with component health information
"""
if self.platform:
return self.platform.check_component_health(self)
# Fallback if platform services not initialized
validation_result = self.validate_components()
return HealthStatus(
is_healthy=validation_result.is_valid,
status="healthy" if validation_result.is_valid else "unhealthy",
details={
"validation_message": validation_result.message,
"validation_details": validation_result.details,
"sub_components": self.get_sub_components(),
"performance": self.get_performance_stats()
}
)
def get_metrics(self) -> Dict[str, Any]:
"""Get component-specific metrics.
Returns:
Dictionary containing component metrics
"""
if self.platform:
return self.platform.collect_component_metrics(self)
# Fallback if platform services not initialized
return {
"performance": self.get_performance_stats(),
"model_info": self.get_model_info(),
"sub_components": self.get_sub_components(),
"cache_stats": {
"hits": self._cache_hits,
"misses": self._cache_misses,
"hit_rate": self._cache_hits / max(1, self._cache_hits + self._cache_misses)
}
}
def get_capabilities(self) -> List[str]:
"""Get list of component capabilities.
Returns:
List of capability strings
"""
capabilities = [
"text_embedding",
"batch_processing",
"caching",
"modular_architecture",
"performance_optimization",
"streaming_support"
]
# Add model-specific capabilities
if self.model:
capabilities.append(f"model_{self.config['model']['type']}")
# Add batch processor capabilities
if self.batch_processor:
capabilities.append(f"batch_processor_{self.config['batch_processor']['type']}")
if self.batch_processor.supports_streaming():
capabilities.append("streaming_processing")
# Add cache capabilities
if self.cache:
capabilities.append(f"cache_{self.config['cache']['type']}")
return capabilities
def cleanup(self) -> None:
"""Clean up resources used by sub-components."""
try:
# Clear cache
if hasattr(self.cache, 'clear'):
self.cache.clear()
# Clean up model cache if available
if hasattr(self.model, 'clear_model_cache'):
self.model.clear_model_cache()
# Reset batch processor stats if available
if hasattr(self.batch_processor, 'reset_performance_stats'):
self.batch_processor.reset_performance_stats()
logger.info("ModularEmbedder cleanup completed")
except Exception as e:
logger.warning(f"Error during cleanup: {e}")
def __del__(self):
"""Cleanup on destruction."""
try:
self.cleanup()
except Exception:
pass # Ignore errors during destruction