""" Modular Embedder Implementation. This module implements the primary Embedder interface that coordinates all embedding sub-components (model, batch processor, cache) through a configurable architecture following the ModularDocumentProcessor pattern. Architecture Notes: - Implements Embedder interface from core.interfaces - Coordinates sub-components via configuration-driven selection - Follows adapter vs direct implementation patterns per specification - Provides unified interface for embedding generation - Includes comprehensive error handling and metrics """ import time from pathlib import Path from typing import List, Dict, Any, Optional import numpy as np import logging import sys # Add project paths for imports project_root = Path(__file__).parent.parent.parent.parent sys.path.append(str(project_root)) from src.core.interfaces import Embedder as EmbedderInterface, HealthStatus # Forward declaration to avoid circular import from typing import TYPE_CHECKING if TYPE_CHECKING: from src.core.platform_orchestrator import PlatformOrchestrator from .base import ( EmbeddingModel, BatchProcessor, EmbeddingCache, ConfigurableEmbedderComponent, ComponentValidationResult ) # Import sub-component implementations from .models.sentence_transformer_model import SentenceTransformerModel from .batch_processors.dynamic_batch_processor import DynamicBatchProcessor from .caches.memory_cache import MemoryCache logger = logging.getLogger(__name__) class ModularEmbedder(EmbedderInterface, ConfigurableEmbedderComponent): """ Modular embedder with configurable sub-components. This embedder implements the Embedder interface while providing a modular architecture where embedding model, batch processing, and caching strategies can be configured independently. Features: - Configuration-driven sub-component selection - Multiple embedding provider support (extensible) - Comprehensive error handling and validation - Performance metrics and monitoring - Pluggable sub-component architecture Configuration Structure: { "model": { "type": "sentence_transformer", # or "openai", "cohere" "config": { "model_name": "all-MiniLM-L6-v2", "device": "mps", "normalize_embeddings": true } }, "batch_processor": { "type": "dynamic", "config": { "initial_batch_size": 32, "max_batch_size": 128, "optimize_for_memory": true } }, "cache": { "type": "memory", # or "redis", "disk" "config": { "max_entries": 100000, "max_memory_mb": 1024 } } } Architecture Compliance: - EmbeddingModel: Mixed (Direct for local, Adapter for APIs) - BatchProcessor: Direct implementation (pure algorithms) - EmbeddingCache: Mixed (Direct for memory, Adapter for external stores) """ # Sub-component type mappings _MODEL_TYPES = { "sentence_transformer": SentenceTransformerModel, # Future: "openai": OpenAIEmbeddingAdapter, # Future: "cohere": CohereEmbeddingAdapter, } _BATCH_PROCESSOR_TYPES = { "dynamic": DynamicBatchProcessor, # Future: "fixed": FixedBatchProcessor, # Future: "streaming": StreamingBatchProcessor, } _CACHE_TYPES = { "memory": MemoryCache, # Future: "redis": RedisCacheAdapter, # Future: "disk": DiskCacheAdapter, } def __init__(self, config: Dict[str, Any] = None, **kwargs): """ Initialize modular embedder with sub-components. Args: config: Embedder configuration dictionary **kwargs: Alternative configuration parameters (for backward compatibility) """ # Handle configuration - prioritize explicit config, fallback to kwargs if config is None: config = kwargs super().__init__(config) # Initialize sub-components in dependency order self.model = self._create_model() self.cache = self._create_cache() self.batch_processor = self._create_batch_processor() # Needs model reference # Performance tracking self._total_embeddings_generated = 0 self._total_processing_time = 0.0 self._cache_hits = 0 self._cache_misses = 0 self._created_time = time.time() # Platform services (initialized via initialize_services) self.platform: Optional['PlatformOrchestrator'] = None # Validate complete system validation_result = self.validate_components() if not validation_result.is_valid: raise RuntimeError(f"Component validation failed: {validation_result.message}") logger.info(f"ModularEmbedder initialized successfully with sub-components: " f"model={self.model.__class__.__name__}, " f"batch_processor={self.batch_processor.__class__.__name__}, " f"cache={self.cache.__class__.__name__}") def _validate_config(self) -> None: """ Validate embedder configuration. Raises: ValueError: If configuration is invalid """ required_sections = ["model", "batch_processor", "cache"] for section in required_sections: if section not in self.config: raise ValueError(f"Missing required configuration section: {section}") # Validate each sub-component config for section in required_sections: section_config = self.config[section] if "type" not in section_config: raise ValueError(f"Missing 'type' in {section} configuration") if "config" not in section_config: raise ValueError(f"Missing 'config' in {section} configuration") def _create_model(self) -> EmbeddingModel: """ Create embedding model sub-component. Returns: Configured EmbeddingModel instance Raises: ValueError: If model type is not supported """ model_config = self.config["model"] model_type = model_config["type"] if model_type not in self._MODEL_TYPES: available_types = list(self._MODEL_TYPES.keys()) raise ValueError(f"Unsupported model type '{model_type}'. Available: {available_types}") model_class = self._MODEL_TYPES[model_type] model_instance = model_class(model_config["config"]) logger.debug(f"Created embedding model: {model_type} -> {model_class.__name__}") return model_instance def _create_batch_processor(self) -> BatchProcessor: """ Create batch processor sub-component. Returns: Configured BatchProcessor instance Raises: ValueError: If batch processor type is not supported """ batch_config = self.config["batch_processor"] batch_type = batch_config["type"] if batch_type not in self._BATCH_PROCESSOR_TYPES: available_types = list(self._BATCH_PROCESSOR_TYPES.keys()) raise ValueError(f"Unsupported batch processor type '{batch_type}'. Available: {available_types}") batch_class = self._BATCH_PROCESSOR_TYPES[batch_type] # BatchProcessor needs reference to the embedding model batch_instance = batch_class(batch_config["config"], self.model) logger.debug(f"Created batch processor: {batch_type} -> {batch_class.__name__}") return batch_instance def _create_cache(self) -> EmbeddingCache: """ Create embedding cache sub-component. Returns: Configured EmbeddingCache instance Raises: ValueError: If cache type is not supported """ cache_config = self.config["cache"] cache_type = cache_config["type"] if cache_type not in self._CACHE_TYPES: available_types = list(self._CACHE_TYPES.keys()) raise ValueError(f"Unsupported cache type '{cache_type}'. Available: {available_types}") cache_class = self._CACHE_TYPES[cache_type] cache_instance = cache_class(cache_config["config"]) logger.debug(f"Created embedding cache: {cache_type} -> {cache_class.__name__}") return cache_instance def embed(self, texts: List[str]) -> List[List[float]]: """ Generate embeddings for a list of texts using the modular architecture. This method coordinates all sub-components: 1. Check cache for existing embeddings 2. Use batch processor for optimal throughput on cache misses 3. Store new embeddings in cache 4. Return combined results Args: texts: List of text strings to embed Returns: List of embedding vectors, where each vector is a list of floats Raises: ValueError: If texts list is empty RuntimeError: If embedding generation fails """ if not texts: raise ValueError("Cannot generate embeddings for empty text list") start_time = time.time() try: # Step 1: Check cache for existing embeddings cached_embeddings = {} texts_to_compute = [] for i, text in enumerate(texts): cached_embedding = self.cache.get(text) if cached_embedding is not None: cached_embeddings[i] = cached_embedding self._cache_hits += 1 else: texts_to_compute.append((i, text)) self._cache_misses += 1 # Step 2: Generate embeddings for cache misses using batch processor new_embeddings = {} if texts_to_compute: texts_for_processing = [text for _, text in texts_to_compute] # Use batch processor for optimal throughput processed_embeddings = self.batch_processor.process_batch( texts_for_processing, batch_size=32 # Will be optimized by batch processor ) # Step 3: Store new embeddings in cache and collect results for j, (original_index, text) in enumerate(texts_to_compute): embedding = processed_embeddings[j] # Store in cache self.cache.put(text, embedding) # Store for result assembly new_embeddings[original_index] = embedding # Step 4: Assemble final results in original order result_embeddings = [] for i in range(len(texts)): if i in cached_embeddings: embedding = cached_embeddings[i] else: embedding = new_embeddings[i] # Convert to list format as required by interface result_embeddings.append(embedding.tolist()) # Update performance statistics processing_time = time.time() - start_time self._total_embeddings_generated += len(texts) self._total_processing_time += processing_time # Track performance using platform services if self.platform: self.platform.track_component_performance( self, "embedding_generation", { "success": True, "processing_time": processing_time, "texts_count": len(texts), "cache_hits": len(cached_embeddings), "new_embeddings": len(new_embeddings), "embedding_dimension": self.embedding_dim() } ) logger.debug(f"Generated {len(texts)} embeddings in {processing_time:.3f}s " f"(cache hits: {len(cached_embeddings)}, computed: {len(new_embeddings)})") return result_embeddings except Exception as e: # Track failure using platform services if self.platform: processing_time = time.time() - start_time self.platform.track_component_performance( self, "embedding_generation", { "success": False, "processing_time": processing_time, "texts_count": len(texts), "error": str(e) } ) logger.error(f"Embedding generation failed: {e}") raise RuntimeError(f"Failed to generate embeddings: {str(e)}") from e def embedding_dim(self) -> int: """ Get the embedding dimension. Returns: Integer dimension of embeddings """ return self.model.get_embedding_dim() def get_model_info(self) -> Dict[str, Any]: """ Get comprehensive information about the embedder and its sub-components. Returns: Dictionary with embedder configuration and status """ return { "component_type": "modular_embedder", "embedding_dimension": self.embedding_dim(), "model": { "type": self.config["model"]["type"], "info": self.model.get_model_info() if hasattr(self.model, 'get_model_info') else {} }, "batch_processor": { "type": self.config["batch_processor"]["type"], "stats": self.batch_processor.get_batch_stats() }, "cache": { "type": self.config["cache"]["type"], "stats": self.cache.get_cache_stats() }, "performance": self.get_performance_stats(), "uptime_seconds": time.time() - self._created_time } def get_performance_stats(self) -> Dict[str, Any]: """ Get performance statistics for the embedder. Returns: Dictionary with performance metrics """ total_requests = self._cache_hits + self._cache_misses cache_hit_rate = self._cache_hits / total_requests if total_requests > 0 else 0.0 avg_throughput = ( self._total_embeddings_generated / self._total_processing_time if self._total_processing_time > 0 else 0.0 ) return { "total_embeddings_generated": self._total_embeddings_generated, "total_processing_time": self._total_processing_time, "average_throughput": avg_throughput, "cache_hits": self._cache_hits, "cache_misses": self._cache_misses, "cache_hit_rate": cache_hit_rate, "uptime_seconds": time.time() - self._created_time } def supports_batching(self) -> bool: """ Check if this embedder supports batch processing. Returns: True, as this implementation supports efficient batch processing """ return True def validate_components(self) -> ComponentValidationResult: """ Validate all sub-components are properly configured and functional. Returns: ComponentValidationResult with validation status """ try: # Test model if not self.model.is_available(): return ComponentValidationResult( False, "Embedding model is not available", {"model_type": self.config["model"]["type"]} ) # Test model with dummy data try: test_embedding = self.model.encode(["test"]) if test_embedding.size == 0: return ComponentValidationResult( False, "Model produced empty embedding", {"model_type": self.config["model"]["type"]} ) except Exception as e: return ComponentValidationResult( False, f"Model encoding test failed: {e}", {"model_type": self.config["model"]["type"]} ) # Test cache try: # Test cache operations test_embedding = np.array([1.0, 2.0, 3.0]) self.cache.put("test_key", test_embedding) retrieved = self.cache.get("test_key") if retrieved is None or not np.array_equal(retrieved, test_embedding): return ComponentValidationResult( False, "Cache put/get operations failed", {"cache_type": self.config["cache"]["type"]} ) # Clean up test data self.cache.invalidate("test_key") except Exception as e: return ComponentValidationResult( False, f"Cache operations test failed: {e}", {"cache_type": self.config["cache"]["type"]} ) # Test batch processor if not hasattr(self.batch_processor, 'process_batch'): return ComponentValidationResult( False, "Batch processor missing required methods", {"batch_processor_type": self.config["batch_processor"]["type"]} ) return ComponentValidationResult( True, "All components validated successfully", { "model_type": self.config["model"]["type"], "batch_processor_type": self.config["batch_processor"]["type"], "cache_type": self.config["cache"]["type"], "embedding_dimension": self.embedding_dim() } ) except Exception as e: return ComponentValidationResult( False, f"Component validation failed with error: {e}", {"error_type": type(e).__name__} ) def get_sub_components(self) -> Dict[str, Any]: """ Get information about all sub-components for factory logging. Returns: Dictionary with sub-component details """ return { "components": { "model": { "type": self.config["model"]["type"], "class": self.model.__class__.__name__, "available": self.model.is_available() }, "batch_processor": { "type": self.config["batch_processor"]["type"], "class": self.batch_processor.__class__.__name__, "supports_streaming": self.batch_processor.supports_streaming() }, "cache": { "type": self.config["cache"]["type"], "class": self.cache.__class__.__name__, "size": self.cache.get_cache_size() } }, "architecture": "modular_embedder", "total_sub_components": 3 } # Standard ComponentBase interface implementation def initialize_services(self, platform: 'PlatformOrchestrator') -> None: """Initialize platform services for the component. Args: platform: PlatformOrchestrator instance providing services """ self.platform = platform logger.info("ModularEmbedder initialized with platform services") def get_health_status(self) -> HealthStatus: """Get the current health status of the component. Returns: HealthStatus object with component health information """ if self.platform: return self.platform.check_component_health(self) # Fallback if platform services not initialized validation_result = self.validate_components() return HealthStatus( is_healthy=validation_result.is_valid, status="healthy" if validation_result.is_valid else "unhealthy", details={ "validation_message": validation_result.message, "validation_details": validation_result.details, "sub_components": self.get_sub_components(), "performance": self.get_performance_stats() } ) def get_metrics(self) -> Dict[str, Any]: """Get component-specific metrics. Returns: Dictionary containing component metrics """ if self.platform: return self.platform.collect_component_metrics(self) # Fallback if platform services not initialized return { "performance": self.get_performance_stats(), "model_info": self.get_model_info(), "sub_components": self.get_sub_components(), "cache_stats": { "hits": self._cache_hits, "misses": self._cache_misses, "hit_rate": self._cache_hits / max(1, self._cache_hits + self._cache_misses) } } def get_capabilities(self) -> List[str]: """Get list of component capabilities. Returns: List of capability strings """ capabilities = [ "text_embedding", "batch_processing", "caching", "modular_architecture", "performance_optimization", "streaming_support" ] # Add model-specific capabilities if self.model: capabilities.append(f"model_{self.config['model']['type']}") # Add batch processor capabilities if self.batch_processor: capabilities.append(f"batch_processor_{self.config['batch_processor']['type']}") if self.batch_processor.supports_streaming(): capabilities.append("streaming_processing") # Add cache capabilities if self.cache: capabilities.append(f"cache_{self.config['cache']['type']}") return capabilities def cleanup(self) -> None: """Clean up resources used by sub-components.""" try: # Clear cache if hasattr(self.cache, 'clear'): self.cache.clear() # Clean up model cache if available if hasattr(self.model, 'clear_model_cache'): self.model.clear_model_cache() # Reset batch processor stats if available if hasattr(self.batch_processor, 'reset_performance_stats'): self.batch_processor.reset_performance_stats() logger.info("ModularEmbedder cleanup completed") except Exception as e: logger.warning(f"Error during cleanup: {e}") def __del__(self): """Cleanup on destruction.""" try: self.cleanup() except Exception: pass # Ignore errors during destruction