Spaces:
Running
Running
""" | |
Component Factory for Phase 3 Direct Wiring Architecture. | |
This module provides a lightweight factory for direct component instantiation, | |
eliminating the ComponentRegistry overhead and improving startup performance. | |
It supports both legacy and unified architectures with type-safe component creation. | |
""" | |
import logging | |
import time | |
import hashlib | |
from typing import Dict, Type, Any, Optional, Union | |
from pathlib import Path | |
from collections import defaultdict, OrderedDict | |
from .interfaces import ( | |
DocumentProcessor, | |
Embedder, | |
VectorStore, | |
Retriever, | |
AnswerGenerator, | |
QueryProcessor | |
) | |
# Component classes will be imported lazily to avoid circular imports | |
# See _get_component_class() method for lazy loading implementation | |
logger = logging.getLogger(__name__) | |
class ComponentFactory: | |
""" | |
Lightweight factory for direct component instantiation. | |
This factory replaces the ComponentRegistry with direct class mappings, | |
eliminating lookup overhead and improving startup performance by ~20%. | |
It maintains type safety and provides clear error messages. | |
Features: | |
- Direct component class mapping (no registry lookup) | |
- Type-safe instantiation with validation | |
- Support for both legacy and unified architectures | |
- Comprehensive error handling with actionable messages | |
- Performance optimized for production workloads | |
Example: | |
factory = ComponentFactory() | |
# Create components directly | |
processor = factory.create_processor("hybrid_pdf", chunk_size=1000) | |
embedder = factory.create_embedder("sentence_transformer", use_mps=True) | |
retriever = factory.create_retriever("unified", embedder=embedder, dense_weight=0.7) | |
""" | |
# Component type mappings - module paths for lazy loading | |
_PROCESSORS: Dict[str, str] = { | |
"hybrid_pdf": "src.components.processors.document_processor.ModularDocumentProcessor", | |
"modular": "src.components.processors.document_processor.ModularDocumentProcessor", | |
"pdf_processor": "src.components.processors.pdf_processor.HybridPDFProcessor", # Legacy processor | |
"legacy_pdf": "src.components.processors.pdf_processor.HybridPDFProcessor", # Alias for legacy | |
} | |
_EMBEDDERS: Dict[str, str] = { | |
"modular": "src.components.embedders.modular_embedder.ModularEmbedder", | |
"sentence_transformer": "src.components.embedders.sentence_transformer_embedder.SentenceTransformerEmbedder", | |
"sentence_transformers": "src.components.embedders.sentence_transformer_embedder.SentenceTransformerEmbedder", # Alias for compatibility | |
} | |
_VECTOR_STORES: Dict[str, str] = { | |
# Legacy vector stores removed - functionality moved to UnifiedRetriever | |
# "faiss": "src.components.vector_stores.faiss_store.FAISSVectorStore", | |
} | |
_RETRIEVERS: Dict[str, str] = { | |
# Legacy Phase 1 architecture moved to archive | |
# "hybrid": "src.components.retrievers.hybrid_retriever.HybridRetriever", | |
"unified": "src.components.retrievers.unified_retriever.UnifiedRetriever", | |
"modular_unified": "src.components.retrievers.modular_unified_retriever.ModularUnifiedRetriever", | |
# Note: enhanced_modular_unified removed - Epic 2 features now handled via advanced config transformation | |
} | |
_GENERATORS: Dict[str, str] = { | |
"adaptive": "src.components.generators.adaptive_generator.AdaptiveAnswerGenerator", | |
"adaptive_generator": "src.components.generators.adaptive_generator.AdaptiveAnswerGenerator", # Alias for compatibility | |
"adaptive_modular": "src.components.generators.answer_generator.AnswerGenerator", # New modular implementation | |
} | |
_QUERY_PROCESSORS: Dict[str, str] = { | |
"modular": "src.components.query_processors.modular_query_processor.ModularQueryProcessor", | |
"modular_query_processor": "src.components.query_processors.modular_query_processor.ModularQueryProcessor", # Alias for compatibility | |
} | |
# Phase 4: Performance monitoring and caching | |
_performance_metrics: Dict[str, Dict[str, Any]] = defaultdict(lambda: { | |
"creation_count": 0, | |
"total_time": 0.0, | |
"average_time": 0.0, | |
"min_time": float('inf'), | |
"max_time": 0.0, | |
"last_created": None | |
}) | |
# Component cache for reusable instances (LRU with max size) | |
_component_cache: OrderedDict[str, Any] = OrderedDict() | |
_cache_max_size: int = 10 # Max cached components | |
_cacheable_types = {"embedder"} # Only cache expensive components | |
# Cache metrics tracking (configurable for production) | |
_cache_metrics_enabled: bool = True # Can be disabled for production | |
_cache_hits: int = 0 | |
_cache_misses: int = 0 | |
_cache_operations: Dict[str, int] = defaultdict(int) | |
# Class cache for lazy loading | |
_class_cache: Dict[str, Type] = {} | |
def _get_component_class(cls, module_path: str) -> Type: | |
""" | |
Lazily import and cache component class. | |
Args: | |
module_path: Module path in format "src.package.module.ClassName" | |
Returns: | |
Component class | |
""" | |
if module_path in cls._class_cache: | |
return cls._class_cache[module_path] | |
try: | |
# Split module path and class name | |
parts = module_path.split('.') | |
class_name = parts[-1] | |
module_path_only = '.'.join(parts[:-1]) | |
# Import module using absolute import | |
from importlib import import_module | |
module = import_module(module_path_only) | |
# Get class from module | |
component_class = getattr(module, class_name) | |
# Cache for future use | |
cls._class_cache[module_path] = component_class | |
return component_class | |
except (ImportError, AttributeError) as e: | |
# Enhanced error handling with dependency suggestions | |
error_msg = str(e) | |
suggestions = [] | |
# Check for specific missing dependencies and provide actionable suggestions | |
if "rank_bm25" in error_msg or "BM25" in error_msg: | |
suggestions.append("Install rank-bm25: pip install rank-bm25>=0.2.2") | |
elif "pdfplumber" in error_msg: | |
suggestions.append("Install pdfplumber: pip install pdfplumber>=0.10.0") | |
elif "sentence_transformers" in error_msg: | |
suggestions.append("Install sentence-transformers: pip install sentence-transformers>=2.2.0") | |
elif "transformers" in error_msg: | |
suggestions.append("Install transformers: pip install transformers>=4.30.0") | |
elif "spacy" in error_msg: | |
suggestions.append("Install spacy: pip install spacy>=3.7.0") | |
suggestions.append("Download spacy model: python -m spacy download en_core_web_sm") | |
elif "weaviate" in error_msg: | |
suggestions.append("Install weaviate-client: pip install weaviate-client>=3.15.0") | |
elif "huggingface_hub" in error_msg: | |
suggestions.append("Install huggingface-hub: pip install huggingface-hub>=0.16.0") | |
elif "accelerate" in error_msg: | |
suggestions.append("Install accelerate: pip install accelerate>=0.20.0") | |
enhanced_msg = f"Failed to import {module_path}: {e}" | |
if suggestions: | |
enhanced_msg += f"\n\nSuggested fixes:\n" + "\n".join(f" - {s}" for s in suggestions) | |
enhanced_msg += f"\n\nAlternatively, install all requirements: pip install -r requirements.txt" | |
logger.error(enhanced_msg) | |
raise ImportError(enhanced_msg) from e | |
def get_performance_metrics(cls) -> Dict[str, Dict[str, Any]]: | |
""" | |
Get performance metrics for component creation. | |
Returns: | |
Dictionary with creation metrics by component type | |
""" | |
return dict(cls._performance_metrics) | |
def reset_performance_metrics(cls) -> None: | |
"""Reset all performance metrics.""" | |
cls._performance_metrics.clear() | |
def get_cache_stats(cls) -> Dict[str, Any]: | |
""" | |
Get component cache statistics. | |
Returns: | |
Dictionary with cache size, hit rate, etc. | |
""" | |
total_operations = cls._cache_hits + cls._cache_misses | |
hit_rate = cls._cache_hits / total_operations if total_operations > 0 else 0.0 | |
return { | |
"cache_size": len(cls._component_cache), | |
"max_size": cls._cache_max_size, | |
"cached_components": list(cls._component_cache.keys()), | |
"cacheable_types": cls._cacheable_types, | |
"metrics_enabled": cls._cache_metrics_enabled, | |
"hits": cls._cache_hits, | |
"misses": cls._cache_misses, | |
"total_operations": total_operations, | |
"hit_rate": hit_rate, | |
"operations_by_type": dict(cls._cache_operations) | |
} | |
def clear_cache(cls) -> None: | |
"""Clear the component cache.""" | |
cls._component_cache.clear() | |
def enable_cache_metrics(cls, enabled: bool = True) -> None: | |
""" | |
Enable/disable cache metrics tracking. | |
Args: | |
enabled: Whether to enable metrics tracking | |
""" | |
cls._cache_metrics_enabled = enabled | |
def reset_cache_metrics(cls) -> None: | |
"""Reset cache metrics counters.""" | |
cls._cache_hits = 0 | |
cls._cache_misses = 0 | |
cls._cache_operations.clear() | |
def _get_cache_key(cls, component_type: str, **kwargs) -> str: | |
""" | |
Generate cache key for component configuration. | |
Args: | |
component_type: Type of component | |
**kwargs: Component configuration | |
Returns: | |
Cache key string | |
""" | |
# Create deterministic key from component type and config | |
config_str = str(sorted(kwargs.items())) | |
key_material = f"{component_type}:{config_str}" | |
return hashlib.md5(key_material.encode()).hexdigest()[:16] | |
def _get_from_cache(cls, cache_key: str) -> Optional[Any]: | |
""" | |
Get component from cache (LRU update). | |
Args: | |
cache_key: Cache key | |
Returns: | |
Cached component or None | |
""" | |
if cache_key in cls._component_cache: | |
# Track cache hit | |
if cls._cache_metrics_enabled: | |
cls._cache_hits += 1 | |
component_type = cache_key.split('_')[0] # Extract component type from key | |
cls._cache_operations[f"hit_{component_type}"] += 1 | |
# Move to end (most recently used) | |
component = cls._component_cache.pop(cache_key) | |
cls._component_cache[cache_key] = component | |
return component | |
else: | |
# Track cache miss | |
if cls._cache_metrics_enabled: | |
cls._cache_misses += 1 | |
component_type = cache_key.split('_')[0] # Extract component type from key | |
cls._cache_operations[f"miss_{component_type}"] += 1 | |
return None | |
def _add_to_cache(cls, cache_key: str, component: Any) -> None: | |
""" | |
Add component to cache with LRU eviction. | |
Args: | |
cache_key: Cache key | |
component: Component to cache | |
""" | |
# Remove oldest if at capacity | |
if len(cls._component_cache) >= cls._cache_max_size: | |
cls._component_cache.popitem(last=False) # Remove oldest | |
cls._component_cache[cache_key] = component | |
def _track_performance(cls, component_type: str, creation_time: float) -> None: | |
""" | |
Track performance metrics for component creation. | |
Args: | |
component_type: Type of component created | |
creation_time: Time taken to create component in seconds | |
""" | |
metrics = cls._performance_metrics[component_type] | |
metrics["creation_count"] += 1 | |
metrics["total_time"] += creation_time | |
metrics["average_time"] = metrics["total_time"] / metrics["creation_count"] | |
metrics["min_time"] = min(metrics["min_time"], creation_time) | |
metrics["max_time"] = max(metrics["max_time"], creation_time) | |
metrics["last_created"] = time.time() | |
def _create_with_tracking(cls, component_class: Type, component_type: str, use_cache: bool = False, **kwargs) -> Any: | |
""" | |
Create component with performance tracking and optional caching. | |
Args: | |
component_class: Class to instantiate | |
component_type: Type identifier for tracking | |
use_cache: Whether to use component caching | |
**kwargs: Constructor arguments | |
Returns: | |
Instantiated component | |
""" | |
# Check cache first if caching is enabled | |
cache_key = None | |
if use_cache: | |
cache_key = cls._get_cache_key(component_type, **kwargs) | |
cached_component = cls._get_from_cache(cache_key) | |
if cached_component is not None: | |
logger.debug(f"Cache hit for {component_type}") | |
cls._track_performance(f"{component_type}_cached", 0.0) | |
return cached_component | |
start_time = time.time() | |
try: | |
# Log component creation with essential information (INFO level for visibility) | |
component = component_class(**kwargs) | |
creation_time = time.time() - start_time | |
# Enhanced logging with component details | |
component_name = component.__class__.__name__ | |
component_module = component.__class__.__module__ | |
logger.info(f"🏭 ComponentFactory created: {component_name} " | |
f"(type={component_type}, module={component_module}, " | |
f"time={creation_time:.3f}s)") | |
# Log component-specific info if available | |
sub_components_logged = False | |
# Check for ModularEmbedder and ModularDocumentProcessor sub-components | |
if hasattr(component, 'get_sub_components'): | |
try: | |
sub_info = component.get_sub_components() | |
if isinstance(sub_info, dict) and 'components' in sub_info: | |
components = sub_info['components'] | |
sub_components = [f"{k}:{v.get('class', 'Unknown')}" for k, v in components.items()] | |
logger.info(f" └─ Sub-components: {', '.join(sub_components)}") | |
sub_components_logged = True | |
except Exception: | |
pass # Don't fail component creation on logging issues | |
# Fallback to legacy get_component_info for backward compatibility | |
if not sub_components_logged and hasattr(component, 'get_component_info'): | |
try: | |
info = component.get_component_info() | |
if isinstance(info, dict) and len(info) > 0: | |
sub_components = [f"{k}:{v.get('class', 'Unknown')}" for k, v in info.items()] | |
logger.info(f" └─ Sub-components: {', '.join(sub_components)}") | |
except Exception: | |
pass # Don't fail component creation on logging issues | |
# Add to cache if caching is enabled | |
if use_cache and cache_key: | |
cls._add_to_cache(cache_key, component) | |
cls._track_performance(component_type, creation_time) | |
return component | |
except Exception as e: | |
creation_time = time.time() - start_time | |
cls._track_performance(f"{component_type}_failed", creation_time) | |
raise | |
def create_processor(cls, processor_type: str, **kwargs) -> DocumentProcessor: | |
""" | |
Create a document processor instance. | |
Args: | |
processor_type: Type of processor ("hybrid_pdf" or "pdf_processor") | |
**kwargs: Arguments to pass to the processor constructor | |
Returns: | |
Instantiated DocumentProcessor | |
Raises: | |
ValueError: If processor type is not supported | |
TypeError: If constructor arguments are invalid | |
""" | |
if processor_type not in cls._PROCESSORS: | |
available = list(cls._PROCESSORS.keys()) | |
raise ValueError( | |
f"Unknown processor type '{processor_type}'. " | |
f"Available processors: {available}" | |
) | |
processor_module_path = cls._PROCESSORS[processor_type] | |
processor_class = cls._get_component_class(processor_module_path) | |
try: | |
return cls._create_with_tracking( | |
processor_class, | |
f"processor_{processor_type}", | |
**kwargs | |
) | |
except Exception as e: | |
raise TypeError( | |
f"Failed to create processor '{processor_type}': {e}. " | |
f"Check constructor arguments: {kwargs}" | |
) from e | |
def create_embedder(cls, embedder_type: str, **kwargs) -> Embedder: | |
""" | |
Create an embedder instance. | |
Args: | |
embedder_type: Type of embedder ("sentence_transformer") | |
**kwargs: Arguments to pass to the embedder constructor | |
Returns: | |
Instantiated Embedder | |
Raises: | |
ValueError: If embedder type is not supported | |
TypeError: If constructor arguments are invalid | |
""" | |
if embedder_type not in cls._EMBEDDERS: | |
available = list(cls._EMBEDDERS.keys()) | |
raise ValueError( | |
f"Unknown embedder type '{embedder_type}'. " | |
f"Available embedders: {available}" | |
) | |
embedder_module_path = cls._EMBEDDERS[embedder_type] | |
embedder_class = cls._get_component_class(embedder_module_path) | |
try: | |
# Use caching for embedders (expensive to create) | |
return cls._create_with_tracking( | |
embedder_class, | |
f"embedder_{embedder_type}", | |
use_cache=True, # Enable caching for embedders | |
**kwargs | |
) | |
except Exception as e: | |
raise TypeError( | |
f"Failed to create embedder '{embedder_type}': {e}. " | |
f"Check constructor arguments: {kwargs}" | |
) from e | |
def create_vector_store(cls, store_type: str, **kwargs) -> VectorStore: | |
""" | |
Create a vector store instance. | |
Args: | |
store_type: Type of vector store ("faiss") | |
**kwargs: Arguments to pass to the vector store constructor | |
Returns: | |
Instantiated VectorStore | |
Raises: | |
ValueError: If vector store type is not supported | |
TypeError: If constructor arguments are invalid | |
""" | |
if store_type not in cls._VECTOR_STORES: | |
available = list(cls._VECTOR_STORES.keys()) | |
raise ValueError( | |
f"Unknown vector store type '{store_type}'. " | |
f"Available vector stores: {available}" | |
) | |
store_module_path = cls._VECTOR_STORES[store_type] | |
store_class = cls._get_component_class(store_module_path) | |
try: | |
logger.debug(f"Creating {store_type} vector store with args: {kwargs}") | |
return store_class(**kwargs) | |
except Exception as e: | |
raise TypeError( | |
f"Failed to create vector store '{store_type}': {e}. " | |
f"Check constructor arguments: {kwargs}" | |
) from e | |
def create_retriever(cls, retriever_type: str, **kwargs) -> Retriever: | |
""" | |
Create a retriever instance. | |
Args: | |
retriever_type: Type of retriever ("unified" or "modular_unified") | |
**kwargs: Arguments to pass to the retriever constructor | |
Returns: | |
Instantiated Retriever | |
Raises: | |
ValueError: If retriever type is not supported | |
TypeError: If constructor arguments are invalid | |
""" | |
if retriever_type not in cls._RETRIEVERS: | |
available = list(cls._RETRIEVERS.keys()) | |
raise ValueError( | |
f"Unknown retriever type '{retriever_type}'. " | |
f"Available retrievers: {available}" | |
) | |
retriever_module_path = cls._RETRIEVERS[retriever_type] | |
retriever_class = cls._get_component_class(retriever_module_path) | |
try: | |
logger.debug(f"Creating {retriever_type} retriever with args: {kwargs}") | |
# Special handling for retrievers that need embedder + config pattern | |
if retriever_type in ["modular_unified"]: | |
# Extract embedder and config from kwargs | |
embedder = kwargs.pop("embedder", None) | |
if embedder is None: | |
raise ValueError(f"ModularUnifiedRetriever requires 'embedder' parameter") | |
# All remaining kwargs become the config | |
config = kwargs | |
# Extract actual config if it's wrapped in a 'config' key | |
if "config" in config and len(config) == 1: | |
actual_config = config["config"] | |
else: | |
actual_config = config | |
# Transform advanced configuration to ModularUnifiedRetriever format if needed | |
if cls._is_advanced_config(actual_config): | |
actual_config = cls._transform_advanced_config(actual_config) | |
# Use the actual config for retriever creation | |
config = actual_config | |
return cls._create_with_tracking( | |
retriever_class, | |
f"retriever_{retriever_type}", | |
config=config, | |
embedder=embedder | |
) | |
else: | |
return cls._create_with_tracking( | |
retriever_class, | |
f"retriever_{retriever_type}", | |
**kwargs | |
) | |
except Exception as e: | |
raise TypeError( | |
f"Failed to create retriever '{retriever_type}': {e}. " | |
f"Check constructor arguments: {kwargs}" | |
) from e | |
def _is_advanced_config(cls, config: Dict[str, Any]) -> bool: | |
""" | |
Check if the configuration contains advanced features that need transformation. | |
Args: | |
config: Configuration dictionary to check | |
Returns: | |
True if advanced configuration is detected | |
""" | |
# Check for advanced configuration indicators | |
advanced_indicators = [ | |
"backends", | |
"neural_reranking", | |
"graph_retrieval", | |
"analytics", | |
"experiments" | |
] | |
# Check for new format neural reranking (reranker.type == "neural") | |
reranker_config = config.get("reranker", {}) | |
if reranker_config.get("type") == "neural": | |
return True | |
# Check for new format graph fusion (fusion.type == "graph_enhanced_rrf") | |
fusion_config = config.get("fusion", {}) | |
if fusion_config.get("type") == "graph_enhanced_rrf": | |
return True | |
return any(indicator in config for indicator in advanced_indicators) | |
def _transform_advanced_config(cls, config: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Transform advanced configuration format to ModularUnifiedRetriever format. | |
Args: | |
config: Advanced configuration dictionary | |
Returns: | |
Transformed configuration for ModularUnifiedRetriever | |
""" | |
logger.debug("Transforming advanced configuration for ModularUnifiedRetriever") | |
# Extract base configuration elements | |
transformed_config = {} | |
# Configure vector index based on backend configuration | |
if "backends" in config: | |
transformed_config["vector_index"] = cls._transform_vector_index_config(config["backends"]) | |
# Configure sparse retriever (BM25) | |
transformed_config["sparse"] = { | |
"type": "bm25", | |
"config": { | |
"k1": 1.2, | |
"b": 0.75, | |
"lowercase": True, | |
"preserve_technical_terms": True, | |
} | |
} | |
# Configure fusion strategy | |
transformed_config["fusion"] = cls._transform_fusion_config(config) | |
# Configure reranker | |
transformed_config["reranker"] = cls._transform_reranker_config(config) | |
# Copy any other configuration that doesn't need transformation | |
for key, value in config.items(): | |
if key not in ["backends", "neural_reranking", "graph_retrieval", "analytics", "experiments", "reranker"]: | |
transformed_config[key] = value | |
logger.debug(f"Transformed advanced config: {transformed_config}") | |
return transformed_config | |
def _transform_vector_index_config(cls, backends_config: Dict[str, Any]) -> Dict[str, Any]: | |
"""Transform backend configuration to vector index configuration.""" | |
primary_backend = backends_config.get("primary_backend", "faiss") | |
if primary_backend == "weaviate": | |
if "weaviate" in backends_config: | |
weaviate_config = backends_config["weaviate"] | |
if isinstance(weaviate_config, dict): | |
return { | |
"type": "weaviate", | |
"config": weaviate_config | |
} | |
else: | |
# Handle config object with to_dict method | |
return { | |
"type": "weaviate", | |
"config": weaviate_config.to_dict() if hasattr(weaviate_config, 'to_dict') else weaviate_config | |
} | |
else: | |
logger.warning("Weaviate backend selected but no weaviate config found, falling back to FAISS") | |
return { | |
"type": "faiss", | |
"config": backends_config.get("faiss", { | |
"index_type": "IndexFlatIP", | |
"normalize_embeddings": True, | |
"metric": "cosine" | |
}) | |
} | |
else: | |
# Default to FAISS | |
return { | |
"type": "faiss", | |
"config": backends_config.get("faiss", { | |
"index_type": "IndexFlatIP", | |
"normalize_embeddings": True, | |
"metric": "cosine" | |
}) | |
} | |
def _transform_fusion_config(cls, config: Dict[str, Any]) -> Dict[str, Any]: | |
"""Transform fusion configuration with graph enhancement support.""" | |
# Get hybrid search configuration | |
hybrid_search = config.get("hybrid_search", {}) | |
# Base fusion configuration | |
base_fusion_config = { | |
"k": hybrid_search.get("rrf_k", 60), | |
"weights": { | |
"dense": hybrid_search.get("dense_weight", 0.7), | |
"sparse": hybrid_search.get("sparse_weight", 0.3), | |
}, | |
} | |
# Check if graph retrieval is enabled | |
graph_retrieval = config.get("graph_retrieval", {}) | |
if graph_retrieval.get("enabled", False): | |
graph_enhanced_config = { | |
"base_fusion": base_fusion_config, | |
"graph_enhancement": { | |
"enabled": True, | |
"graph_weight": 0.1, | |
"entity_boost": 0.15, | |
"relationship_boost": 0.1, | |
"similarity_threshold": graph_retrieval.get("similarity_threshold", 0.7), | |
"max_graph_hops": graph_retrieval.get("max_graph_hops", 3) | |
} | |
} | |
return { | |
"type": "graph_enhanced_rrf", | |
"config": graph_enhanced_config | |
} | |
else: | |
# Use standard fusion - check for explicit fusion.type first, then fall back to hybrid_search | |
explicit_fusion = config.get("fusion", {}) | |
fusion_type = explicit_fusion.get("type") or hybrid_search.get("fusion_method", "rrf") | |
# If explicit fusion config exists, use it; otherwise use base config | |
fusion_config = explicit_fusion.get("config", base_fusion_config) if explicit_fusion.get("type") else base_fusion_config | |
return { | |
"type": fusion_type, | |
"config": fusion_config | |
} | |
def _transform_reranker_config(cls, config: Dict[str, Any]) -> Dict[str, Any]: | |
"""Transform reranker configuration with neural reranking support.""" | |
# Check if neural reranking is enabled (support both old and new format) | |
neural_reranking = config.get("neural_reranking", {}) | |
reranker_config = config.get("reranker", {}) | |
# Check new format first (reranker.type == "neural") | |
if reranker_config.get("type") == "neural" and reranker_config.get("config", {}).get("enabled", False): | |
neural_config_source = reranker_config.get("config", {}) | |
# Check old format (neural_reranking.enabled) | |
elif neural_reranking.get("enabled", False): | |
neural_config_source = neural_reranking | |
else: | |
neural_config_source = None | |
if neural_config_source: | |
# Convert neural reranking config to proper NeuralReranker format | |
neural_config = { | |
"enabled": True, | |
"max_candidates": neural_config_source.get("max_candidates", 50), | |
"default_model": neural_config_source.get("default_model", "default_model"), | |
"models": neural_config_source.get("models", { | |
"default_model": { | |
"name": neural_config_source.get("model_name", "cross-encoder/ms-marco-MiniLM-L6-v2"), | |
"max_length": neural_config_source.get("max_length", 512), | |
"batch_size": neural_config_source.get("batch_size", 16), | |
"device": "mps" if neural_config_source.get("device", "auto") == "auto" else neural_config_source.get("device", "auto") | |
} | |
}), | |
"performance": { | |
"target_latency_ms": 200, | |
"max_latency_ms": neural_config_source.get("max_latency_ms", 1000), | |
"enable_caching": True, | |
"max_cache_size": 10000 | |
}, | |
"score_fusion": { | |
"method": "weighted", | |
"weights": { | |
"neural_score": 0.7, | |
"retrieval_score": 0.3, | |
"graph_score": 0.0, | |
"temporal_score": 0.0 | |
} | |
}, | |
"adaptive": { | |
"enabled": True, | |
"confidence_threshold": 0.7 | |
} | |
} | |
logger.info(f"✅ ComponentFactory: Neural reranker config transformed successfully") | |
logger.debug(f"Neural reranker config: {neural_config}") | |
return { | |
"type": "neural", | |
"config": neural_config | |
} | |
else: | |
# Use identity reranker | |
return { | |
"type": "identity", | |
"config": {"enabled": True} | |
} | |
def create_generator(cls, generator_type: str, **kwargs) -> AnswerGenerator: | |
""" | |
Create an answer generator instance. | |
Args: | |
generator_type: Type of generator ("adaptive") | |
**kwargs: Arguments to pass to the generator constructor | |
Returns: | |
Instantiated AnswerGenerator | |
Raises: | |
ValueError: If generator type is not supported | |
TypeError: If constructor arguments are invalid | |
""" | |
if generator_type not in cls._GENERATORS: | |
available = list(cls._GENERATORS.keys()) | |
raise ValueError( | |
f"Unknown generator type '{generator_type}'. " | |
f"Available generators: {available}" | |
) | |
generator_module_path = cls._GENERATORS[generator_type] | |
generator_class = cls._get_component_class(generator_module_path) | |
try: | |
logger.debug(f"Creating {generator_type} generator with args: {kwargs}") | |
return cls._create_with_tracking( | |
generator_class, | |
f"generator_{generator_type}", | |
**kwargs | |
) | |
except Exception as e: | |
raise TypeError( | |
f"Failed to create generator '{generator_type}': {e}. " | |
f"Check constructor arguments: {kwargs}" | |
) from e | |
def create_query_processor(cls, processor_type: str, **kwargs) -> QueryProcessor: | |
""" | |
Create a query processor instance. | |
Args: | |
processor_type: Type of query processor ("modular") | |
**kwargs: Arguments to pass to the processor constructor | |
Returns: | |
Instantiated QueryProcessor | |
Raises: | |
ValueError: If processor type is not supported | |
TypeError: If constructor arguments are invalid | |
""" | |
if processor_type not in cls._QUERY_PROCESSORS: | |
available = list(cls._QUERY_PROCESSORS.keys()) | |
raise ValueError( | |
f"Unknown query processor type '{processor_type}'. " | |
f"Available query processors: {available}" | |
) | |
processor_module_path = cls._QUERY_PROCESSORS[processor_type] | |
processor_class = cls._get_component_class(processor_module_path) | |
try: | |
logger.debug(f"Creating {processor_type} query processor with args: {kwargs}") | |
return cls._create_with_tracking( | |
processor_class, | |
f"query_processor_{processor_type}", | |
**kwargs | |
) | |
except Exception as e: | |
raise TypeError( | |
f"Failed to create query processor '{processor_type}': {e}. " | |
f"Check constructor arguments: {kwargs}" | |
) from e | |
def is_supported(cls, component_type: str, name: str) -> bool: | |
""" | |
Check if a component type and name are supported. | |
Args: | |
component_type: Type of component ('processor', 'embedder', 'vector_store', | |
'retriever', 'generator') | |
name: Component name to check | |
Returns: | |
True if component is supported, False otherwise | |
""" | |
type_mappings = { | |
'processor': cls._PROCESSORS, | |
'embedder': cls._EMBEDDERS, | |
'vector_store': cls._VECTOR_STORES, | |
'retriever': cls._RETRIEVERS, | |
'generator': cls._GENERATORS, | |
'query_processor': cls._QUERY_PROCESSORS | |
} | |
mapping = type_mappings.get(component_type) | |
if mapping is None: | |
return False | |
return name in mapping | |
def get_all_supported_components(cls) -> Dict[str, list[str]]: | |
"""Get all supported components organized by type (alias for get_available_components).""" | |
return cls.get_available_components() | |
def get_available_components(cls) -> Dict[str, list[str]]: | |
""" | |
Get all available components organized by type. | |
Returns: | |
Dictionary mapping component types to lists of available component names | |
""" | |
return { | |
"processors": list(cls._PROCESSORS.keys()), | |
"embedders": list(cls._EMBEDDERS.keys()), | |
"vector_stores": list(cls._VECTOR_STORES.keys()), | |
"retrievers": list(cls._RETRIEVERS.keys()), | |
"generators": list(cls._GENERATORS.keys()), | |
"query_processors": list(cls._QUERY_PROCESSORS.keys()), | |
} | |
def validate_configuration(cls, config: Dict[str, Any]) -> list[str]: | |
""" | |
Validate component configuration. | |
Args: | |
config: Configuration dictionary with component specifications | |
Returns: | |
List of validation errors (empty if valid) | |
""" | |
errors = [] | |
# Component type mappings for validation | |
required_components = { | |
'document_processor': 'processors', | |
'embedder': 'embedders', | |
'retriever': 'retrievers', | |
'answer_generator': 'generators' | |
} | |
# vector_store is optional in unified architecture | |
optional_components = { | |
'vector_store': 'vector_stores' | |
} | |
all_components = {**required_components, **optional_components} | |
available = cls.get_available_components() | |
# Check required components | |
for comp_key, comp_type_key in required_components.items(): | |
if comp_key not in config: | |
errors.append(f"Missing required component: {comp_key}") | |
continue | |
comp_config = config[comp_key] | |
if not isinstance(comp_config, dict) or 'type' not in comp_config: | |
errors.append(f"Invalid configuration for {comp_key}: missing 'type' field") | |
continue | |
comp_type = comp_config['type'] | |
if comp_type not in available[comp_type_key]: | |
errors.append( | |
f"Unknown {comp_key} type '{comp_type}'. " | |
f"Available: {available[comp_type_key]}" | |
) | |
# Check optional components if present | |
for comp_key, comp_type_key in optional_components.items(): | |
if comp_key in config: | |
comp_config = config[comp_key] | |
if not isinstance(comp_config, dict) or 'type' not in comp_config: | |
errors.append(f"Invalid configuration for {comp_key}: missing 'type' field") | |
continue | |
comp_type = comp_config['type'] | |
if comp_type not in available[comp_type_key]: | |
errors.append( | |
f"Unknown {comp_key} type '{comp_type}'. " | |
f"Available: {available[comp_type_key]}" | |
) | |
return errors |