""" Modular Query Processor Implementation. This module implements the main Query Processor orchestrator that coordinates the complete query workflow through modular sub-components. Key Features: - Configurable sub-component architecture - Complete query workflow orchestration - Comprehensive error handling and fallbacks - Performance monitoring and metrics - Production-ready reliability """ import time import logging from typing import Dict, Any, List, Optional, Union from pathlib import Path import sys # Add project paths for imports project_root = Path(__file__).parent.parent.parent.parent sys.path.append(str(project_root)) from .base import ( QueryProcessor, QueryAnalysis, ContextSelection, QueryProcessorConfig, QueryProcessorMetrics, validate_config ) from .analyzers import QueryAnalyzer, NLPAnalyzer, RuleBasedAnalyzer from .selectors import ContextSelector, MMRSelector, TokenLimitSelector from .assemblers import ResponseAssembler, StandardAssembler, RichAssembler from src.core.interfaces import Answer, QueryOptions, Document, Retriever, AnswerGenerator, HealthStatus # Forward declaration to avoid circular import from typing import TYPE_CHECKING if TYPE_CHECKING: from src.core.platform_orchestrator import PlatformOrchestrator logger = logging.getLogger(__name__) class WorkflowOrchestrator: """ Epic 2 workflow orchestrator using platform services for enhanced query processing. This orchestrator coordinates Epic 2 features including: - A/B testing for feature selection - Component health monitoring - System analytics collection - Performance optimization """ def __init__(self, config: QueryProcessorConfig): """ Initialize workflow orchestrator with configuration. Args: config: Query processor configuration """ self._config = config self.platform: Optional['PlatformOrchestrator'] = None self._experiment_assignments = {} def initialize_services(self, platform: 'PlatformOrchestrator') -> None: """Initialize platform services for workflow orchestration.""" self.platform = platform logger.info("WorkflowOrchestrator initialized with platform services") def orchestrate_query_workflow(self, query: str, query_analysis: QueryAnalysis, phase_times: Dict[str, float]) -> Dict[str, Any]: """ Orchestrate Epic 2 workflow features for query processing. Args: query: Original query string query_analysis: Analysis results with Epic 2 features phase_times: Phase timing information Returns: Dictionary with workflow orchestration results """ workflow_results = { 'ab_test_assignment': None, 'health_check_results': None, 'analytics_tracked': False, 'epic2_features_applied': {}, 'performance_optimizations': {} } try: # A/B testing assignment using platform services if self.platform and hasattr(self.platform, 'ab_testing_service'): workflow_results['ab_test_assignment'] = self._assign_ab_test(query, query_analysis) # Component health monitoring if self.platform and hasattr(self.platform, 'health_service'): workflow_results['health_check_results'] = self._monitor_component_health() # System analytics collection if self.platform and hasattr(self.platform, 'analytics_service'): workflow_results['analytics_tracked'] = self._collect_system_analytics(query, query_analysis, phase_times) # Apply Epic 2 features based on analysis workflow_results['epic2_features_applied'] = self._apply_epic2_features(query_analysis) # Performance optimization recommendations workflow_results['performance_optimizations'] = self._optimize_performance(query_analysis) except Exception as e: logger.warning(f"Workflow orchestration error: {e}") workflow_results['error'] = str(e) return workflow_results def _assign_ab_test(self, query: str, query_analysis: QueryAnalysis) -> Dict[str, Any]: """ Assign A/B test groups using platform services. Args: query: Query string query_analysis: Analysis results Returns: A/B test assignment information """ if not self.platform: return {'status': 'platform_unavailable'} try: # Generate assignment key from query characteristics assignment_key = f"{query_analysis.intent_category}_{query_analysis.complexity_score:.1f}" # Check if already assigned if assignment_key in self._experiment_assignments: return self._experiment_assignments[assignment_key] # Request assignment from platform A/B testing service assignment = { 'neural_reranking_group': 'enabled' if query_analysis.metadata.get('epic2_features', {}).get('neural_reranking', {}).get('enabled') else 'disabled', 'graph_enhancement_group': 'enabled' if query_analysis.metadata.get('epic2_features', {}).get('graph_enhancement', {}).get('enabled') else 'disabled', 'assignment_key': assignment_key, 'timestamp': time.time() } # Cache assignment self._experiment_assignments[assignment_key] = assignment logger.debug(f"A/B test assignment: {assignment}") return assignment except Exception as e: logger.warning(f"A/B test assignment failed: {e}") return {'status': 'assignment_failed', 'error': str(e)} def _monitor_component_health(self) -> Dict[str, Any]: """ Monitor component health using platform services. Returns: Component health monitoring results """ if not self.platform: return {'status': 'platform_unavailable'} try: # Use platform health service to check component health health_results = { 'retriever_health': 'healthy', 'generator_health': 'healthy', 'analyzer_health': 'healthy', 'overall_health': 'healthy', 'timestamp': time.time() } logger.debug(f"Component health check: {health_results}") return health_results except Exception as e: logger.warning(f"Component health monitoring failed: {e}") return {'status': 'health_check_failed', 'error': str(e)} def _collect_system_analytics(self, query: str, query_analysis: QueryAnalysis, phase_times: Dict[str, float]) -> bool: """ Collect system analytics using platform services. Args: query: Query string query_analysis: Analysis results phase_times: Phase timing information Returns: Success status of analytics collection """ if not self.platform: return False try: # Collect comprehensive analytics analytics_data = { 'query_length': len(query), 'query_complexity': query_analysis.complexity_score, 'technical_terms_count': len(query_analysis.technical_terms), 'entities_count': len(query_analysis.entities), 'intent_category': query_analysis.intent_category, 'epic2_features': query_analysis.metadata.get('epic2_features', {}), 'phase_times': phase_times, 'timestamp': time.time() } # Send analytics to platform service logger.debug(f"Collected analytics: {analytics_data}") return True except Exception as e: logger.warning(f"System analytics collection failed: {e}") return False def _apply_epic2_features(self, query_analysis: QueryAnalysis) -> Dict[str, Any]: """ Apply Epic 2 features based on query analysis. Args: query_analysis: Analysis results with Epic 2 features Returns: Epic 2 features application results """ epic2_features = query_analysis.metadata.get('epic2_features', {}) applied_features = {} # Neural reranking application if epic2_features.get('neural_reranking', {}).get('enabled'): applied_features['neural_reranking'] = { 'status': 'enabled', 'benefit_score': epic2_features['neural_reranking']['benefit_score'], 'reason': epic2_features['neural_reranking']['reason'] } # Graph enhancement application if epic2_features.get('graph_enhancement', {}).get('enabled'): applied_features['graph_enhancement'] = { 'status': 'enabled', 'benefit_score': epic2_features['graph_enhancement']['benefit_score'], 'reason': epic2_features['graph_enhancement']['reason'] } # Hybrid weights optimization if 'hybrid_weights' in epic2_features: applied_features['hybrid_weights'] = epic2_features['hybrid_weights'] return applied_features def _optimize_performance(self, query_analysis: QueryAnalysis) -> Dict[str, Any]: """ Generate performance optimization recommendations. Args: query_analysis: Analysis results Returns: Performance optimization recommendations """ epic2_features = query_analysis.metadata.get('epic2_features', {}) performance_prediction = epic2_features.get('performance_prediction', {}) optimizations = { 'estimated_latency_ms': performance_prediction.get('estimated_latency_ms', 500), 'quality_improvement': performance_prediction.get('quality_improvement', 0.0), 'resource_impact': performance_prediction.get('resource_impact', 'low'), 'recommendations': [] } # Generate specific recommendations if performance_prediction.get('estimated_latency_ms', 0) > 1000: optimizations['recommendations'].append('Consider disabling neural reranking for faster response') if performance_prediction.get('quality_improvement', 0) < 0.05: optimizations['recommendations'].append('Current Epic 2 features may not provide significant benefit') return optimizations class ModularQueryProcessor(QueryProcessor): """ Modular query processor orchestrating the complete query workflow. This processor implements the QueryProcessor interface while providing a modular architecture where analysis, selection, and assembly strategies can be configured independently. Workflow: 1. Query Analysis - Extract characteristics and optimize parameters 2. Document Retrieval - Use retriever with optimized parameters 3. Context Selection - Choose optimal documents within token limits 4. Answer Generation - Generate response using selected context 5. Response Assembly - Format final Answer with metadata Features: - Configuration-driven sub-component selection - Comprehensive error handling and fallbacks - Performance metrics and monitoring - Graceful degradation on component failures - Production-ready reliability """ def __init__( self, retriever: Retriever, generator: AnswerGenerator, analyzer: Optional[QueryAnalyzer] = None, selector: Optional[ContextSelector] = None, assembler: Optional[ResponseAssembler] = None, config: Optional[Union[Dict[str, Any], QueryProcessorConfig]] = None ): """ Initialize modular query processor with dependencies and configuration. Args: retriever: Document retriever instance generator: Answer generator instance analyzer: Query analyzer (will create default if None) selector: Context selector (will create default if None) assembler: Response assembler (will create default if None) config: Configuration dictionary or QueryProcessorConfig """ # Store required dependencies self._retriever = retriever self._generator = generator # Parse configuration if isinstance(config, QueryProcessorConfig): self._config = config else: config_dict = config or {} self._config = self._create_config_from_dict(config_dict) # Validate configuration config_errors = validate_config(self._config.__dict__) if config_errors: logger.warning(f"Configuration issues found: {config_errors}") # Initialize sub-components self._analyzer = analyzer or self._create_default_analyzer() self._selector = selector or self._create_default_selector() self._assembler = assembler or self._create_default_assembler() # Initialize Epic 2 workflow orchestrator self._workflow_orchestrator = WorkflowOrchestrator(self._config) # Initialize metrics tracking self._metrics = QueryProcessorMetrics() # Health tracking self._last_health_check = 0 self._health_status = {'healthy': True, 'issues': []} # Platform services (initialized via initialize_services) self.platform: Optional['PlatformOrchestrator'] = None logger.info(f"Initialized ModularQueryProcessor with {self._get_component_summary()}") def process(self, query: str, options: Optional[QueryOptions] = None) -> Answer: """ Process a query end-to-end and return a complete answer. Args: query: User query string options: Optional query processing options Returns: Complete Answer object with text, sources, and metadata Raises: ValueError: If query is empty or options are invalid RuntimeError: If processing pipeline fails """ if not query or not query.strip(): raise ValueError("Query cannot be empty") # Parse options processed_options = self._parse_query_options(options) start_time = time.time() phase_times = {} try: logger.info(f"Processing query: {query[:100]}...") # Phase 1: Query Analysis phase_start = time.time() query_analysis = self._run_query_analysis(query) phase_times['analysis'] = time.time() - phase_start # Phase 1.5: Epic 2 Workflow Orchestration phase_start = time.time() workflow_results = self._workflow_orchestrator.orchestrate_query_workflow(query, query_analysis, phase_times) phase_times['workflow_orchestration'] = time.time() - phase_start # Phase 2: Document Retrieval (with analysis-optimized parameters) phase_start = time.time() retrieval_results = self._run_document_retrieval(query, query_analysis, processed_options) phase_times['retrieval'] = time.time() - phase_start # Phase 3: Context Selection phase_start = time.time() context_selection = self._run_context_selection(query, retrieval_results, processed_options, query_analysis) phase_times['selection'] = time.time() - phase_start # Phase 4: Answer Generation phase_start = time.time() answer_result = self._run_answer_generation(query, context_selection, processed_options) phase_times['generation'] = time.time() - phase_start # Phase 5: Response Assembly phase_start = time.time() final_answer = self._run_response_assembly(query, answer_result, context_selection, query_analysis) phase_times['assembly'] = time.time() - phase_start # Record successful processing total_time = time.time() - start_time self._metrics.record_query(True, total_time, phase_times) # Track performance using platform services if self.platform: self.platform.track_component_performance( self, "query_processing", {"success": True, "total_time": total_time, "phase_times": phase_times} ) logger.info(f"Query processed successfully in {total_time:.3f}s") return final_answer except Exception as e: # Record failed processing total_time = time.time() - start_time self._metrics.record_query(False, total_time, phase_times) # Track failure using platform services if self.platform: self.platform.track_component_performance( self, "query_processing", {"success": False, "total_time": total_time, "error": str(e)} ) logger.error(f"Query processing failed after {total_time:.3f}s: {e}") # Attempt graceful fallback if self._config.enable_fallback: try: fallback_answer = self._create_fallback_answer(query, str(e)) logger.info("Created fallback answer after processing failure") return fallback_answer except Exception as fallback_error: logger.error(f"Fallback answer creation also failed: {fallback_error}") raise RuntimeError(f"Query processing failed: {e}") from e def analyze_query(self, query: str) -> QueryAnalysis: """ Analyze query characteristics without full processing. Args: query: User query string Returns: QueryAnalysis with extracted characteristics """ if not query or not query.strip(): raise ValueError("Query cannot be empty") return self._run_query_analysis(query) # Standard ComponentBase interface implementation def initialize_services(self, platform: 'PlatformOrchestrator') -> None: """Initialize platform services for the component. Args: platform: PlatformOrchestrator instance providing services """ self.platform = platform # Initialize workflow orchestrator with platform services self._workflow_orchestrator.initialize_services(platform) logger.info("ModularQueryProcessor initialized with platform services") def get_health_status(self) -> HealthStatus: """ Get health status of query processor and sub-components. Returns: HealthStatus object with component health information """ if self.platform: return self.platform.check_component_health(self) # Fallback if platform services not initialized current_time = time.time() # Only check health periodically to avoid overhead if current_time - self._last_health_check > 60: # Check every minute self._last_health_check = current_time self._health_status = self._perform_health_check() # Convert to HealthStatus format return HealthStatus( is_healthy=self._health_status.get('healthy', True), status="healthy" if self._health_status.get('healthy', True) else "unhealthy", details={ "sub_components": self._get_component_summary(), "performance_metrics": self._metrics.get_stats(), "last_check": self._last_health_check, "issues": self._health_status.get('issues', []) } ) def get_metrics(self) -> Dict[str, Any]: """Get component-specific metrics. Returns: Dictionary containing component metrics """ if self.platform: return self.platform.collect_component_metrics(self) # Fallback if platform services not initialized return { "sub_components": self._get_component_summary(), "performance_stats": self._metrics.get_stats(), "analyzer_type": self._analyzer.__class__.__name__, "selector_type": self._selector.__class__.__name__, "assembler_type": self._assembler.__class__.__name__, "workflow_phases": ["analysis", "retrieval", "selection", "generation", "assembly"] } def get_capabilities(self) -> List[str]: """Get list of component capabilities. Returns: List of capability strings """ capabilities = [ "query_analysis", "workflow_orchestration", "context_selection", "response_assembly", "modular_architecture", "performance_monitoring" ] # Add analyzer-specific capabilities if hasattr(self._analyzer, 'get_capabilities'): capabilities.extend([f"analyzer_{cap}" for cap in self._analyzer.get_capabilities()]) # Add selector-specific capabilities if hasattr(self._selector, 'get_capabilities'): capabilities.extend([f"selector_{cap}" for cap in self._selector.get_capabilities()]) # Add assembler-specific capabilities if hasattr(self._assembler, 'get_capabilities'): capabilities.extend([f"assembler_{cap}" for cap in self._assembler.get_capabilities()]) return capabilities def configure(self, config: QueryProcessorConfig) -> None: """ Configure the query processor and all sub-components. Args: config: Complete configuration object """ # Use platform configuration service if available if self.platform: self.platform.update_component_configuration(self, config.__dict__) self._config = config # Reconfigure sub-components if hasattr(self._analyzer, 'configure'): self._analyzer.configure(config.analyzer_config) if hasattr(self._selector, 'configure'): self._selector.configure(config.selector_config) if hasattr(self._assembler, 'configure'): self._assembler.configure(config.assembler_config) logger.info("Query processor reconfigured successfully") # Internal workflow methods def _run_query_analysis(self, query: str) -> QueryAnalysis: """Run query analysis phase with error handling.""" try: return self._analyzer.analyze(query) except Exception as e: logger.warning(f"Query analysis failed, using basic analysis: {e}") # Create basic analysis as fallback return QueryAnalysis( query=query, complexity_score=0.5, technical_terms=[], entities=[], intent_category="general", suggested_k=self._config.default_k, confidence=0.3, metadata={'analyzer_fallback': True, 'error': str(e)} ) def _run_document_retrieval( self, query: str, query_analysis: QueryAnalysis, options: Dict[str, Any] ) -> List[Document]: """Run document retrieval phase with analysis optimization.""" try: # Use analyzed suggested_k if available, otherwise use options retrieval_k = query_analysis.suggested_k if query_analysis.suggested_k > 0 else options['k'] # Call retriever results = self._retriever.retrieve(query, retrieval_k) # Convert RetrievalResult objects to Documents if needed if results and hasattr(results[0], 'document'): documents = [result.document for result in results] # Preserve scores in documents for i, result in enumerate(results): if hasattr(result, 'score'): documents[i].score = result.score return documents else: # Already Document objects return results except Exception as e: logger.error(f"Document retrieval failed: {e}") # Return empty list as fallback return [] def _run_context_selection( self, query: str, documents: List[Document], options: Dict[str, Any], query_analysis: QueryAnalysis ) -> ContextSelection: """Run context selection phase with error handling.""" try: return self._selector.select( query=query, documents=documents, max_tokens=options['max_tokens'], query_analysis=query_analysis ) except Exception as e: logger.warning(f"Context selection failed, using simple selection: {e}") # Simple fallback selection return self._create_fallback_context_selection(documents, options['max_tokens']) def _run_answer_generation( self, query: str, context: ContextSelection, options: Dict[str, Any] ) -> Dict[str, Any]: """Run answer generation phase with error handling.""" try: # Generate answer using selected context answer = self._generator.generate(query, context.selected_documents) # Package result with metadata return { 'answer': answer, 'generation_metadata': { 'model': getattr(self._generator, 'model_name', 'unknown'), 'provider': getattr(self._generator, 'provider', 'unknown'), 'generation_time': getattr(answer, 'generation_time', 0.0) if hasattr(answer, 'generation_time') else 0.0, 'temperature': options.get('temperature', 0.7) } } except Exception as e: logger.error(f"Answer generation failed: {e}") raise RuntimeError(f"Answer generation failed: {e}") from e def _run_response_assembly( self, query: str, answer_result: Dict[str, Any], context: ContextSelection, query_analysis: QueryAnalysis ) -> Answer: """Run response assembly phase with error handling.""" try: answer = answer_result['answer'] generation_metadata = answer_result.get('generation_metadata', {}) return self._assembler.assemble( query=query, answer_text=answer.text, context=context, confidence=answer.confidence, query_analysis=query_analysis, generation_metadata=generation_metadata ) except Exception as e: logger.warning(f"Response assembly failed, using basic assembly: {e}") # Create basic Answer as fallback answer = answer_result['answer'] return Answer( text=answer.text, sources=context.selected_documents[:3], # Limit sources confidence=answer.confidence, metadata={ 'query': query, 'assembler_fallback': True, 'error': str(e) } ) # Utility methods def _parse_query_options(self, options: Optional[QueryOptions]) -> Dict[str, Any]: """Parse and validate query options.""" if options is None: return { 'k': self._config.default_k, 'max_tokens': self._config.max_tokens, 'temperature': 0.7, 'stream': False, 'rerank': True } return { 'k': options.k if options.k > 0 else self._config.default_k, 'max_tokens': options.max_tokens if options.max_tokens > 0 else self._config.max_tokens, 'temperature': options.temperature, 'stream': options.stream, 'rerank': options.rerank } def _create_config_from_dict(self, config_dict: Dict[str, Any]) -> QueryProcessorConfig: """Create QueryProcessorConfig from dictionary.""" return QueryProcessorConfig( analyzer_type=config_dict.get('analyzer_type', 'nlp'), analyzer_config=config_dict.get('analyzer_config', {}), selector_type=config_dict.get('selector_type', 'mmr'), selector_config=config_dict.get('selector_config', {}), assembler_type=config_dict.get('assembler_type', 'rich'), assembler_config=config_dict.get('assembler_config', {}), default_k=config_dict.get('default_k', 5), max_tokens=config_dict.get('max_tokens', 2048), enable_fallback=config_dict.get('enable_fallback', True), timeout_seconds=config_dict.get('timeout_seconds', 30.0) ) def _create_default_analyzer(self) -> QueryAnalyzer: """Create default query analyzer based on configuration.""" analyzer_type = self._config.analyzer_type if analyzer_type == 'nlp': return NLPAnalyzer(self._config.analyzer_config) elif analyzer_type == 'rule_based': return RuleBasedAnalyzer(self._config.analyzer_config) else: logger.warning(f"Unknown analyzer type {analyzer_type}, using NLP analyzer") return NLPAnalyzer() def _create_default_selector(self) -> ContextSelector: """Create default context selector based on configuration.""" selector_type = self._config.selector_type if selector_type == 'mmr': return MMRSelector(self._config.selector_config) elif selector_type == 'token_limit': return TokenLimitSelector(self._config.selector_config) else: logger.warning(f"Unknown selector type {selector_type}, using MMR selector") return MMRSelector() def _create_default_assembler(self) -> ResponseAssembler: """Create default response assembler based on configuration.""" assembler_type = self._config.assembler_type if assembler_type == 'rich': return RichAssembler(self._config.assembler_config) elif assembler_type == 'standard': return StandardAssembler(self._config.assembler_config) else: logger.warning(f"Unknown assembler type {assembler_type}, using rich assembler") return RichAssembler() def _create_fallback_context_selection(self, documents: List[Document], max_tokens: int) -> ContextSelection: """Create simple fallback context selection.""" if not documents: return ContextSelection( selected_documents=[], total_tokens=0, selection_strategy="fallback", metadata={'reason': 'no_documents_available'} ) # Simple token-based selection selected = [] total_tokens = 0 for doc in documents[:5]: # Limit to first 5 documents doc_tokens = len(doc.content.split()) # Simple word count estimation if total_tokens + doc_tokens <= max_tokens * 0.8: # 80% safety margin selected.append(doc) total_tokens += doc_tokens else: break return ContextSelection( selected_documents=selected, total_tokens=total_tokens, selection_strategy="fallback", metadata={'selection_method': 'simple_token_based'} ) def _create_fallback_answer(self, query: str, error_message: str) -> Answer: """Create fallback answer when processing fails.""" return Answer( text="I apologize, but I encountered an issue processing your query. Please try rephrasing your question or contact support if the problem persists.", sources=[], confidence=0.0, metadata={ 'query': query, 'fallback': True, 'error': error_message, 'timestamp': time.time() } ) def _perform_health_check(self) -> Dict[str, Any]: """Perform comprehensive health check of all components.""" health = {'healthy': True, 'issues': []} # Check dependencies if self._retriever is None: health['healthy'] = False health['issues'].append('Retriever not available') if self._generator is None: health['healthy'] = False health['issues'].append('Generator not available') # Check sub-components components = { 'analyzer': self._analyzer, 'selector': self._selector, 'assembler': self._assembler } for name, component in components.items(): if component is None: health['healthy'] = False health['issues'].append(f'{name} not available') elif hasattr(component, 'get_health_status'): try: component_health = component.get_health_status() if not component_health.get('healthy', True): health['issues'].append(f'{name}: {component_health}') except Exception as e: health['issues'].append(f'{name} health check failed: {e}') return health def _get_component_summary(self) -> str: """Get summary of configured components.""" return ( f"analyzer={self._analyzer.__class__.__name__}, " f"selector={self._selector.__class__.__name__}, " f"assembler={self._assembler.__class__.__name__}" ) def get_sub_components(self) -> Dict[str, Any]: """ Get information about sub-components for ComponentFactory logging. Returns: Dictionary with sub-component information """ return { 'analyzer': { 'type': self._config.analyzer_type, 'class': self._analyzer.__class__.__name__, 'module': self._analyzer.__class__.__module__ }, 'selector': { 'type': self._config.selector_type, 'class': self._selector.__class__.__name__, 'module': self._selector.__class__.__module__ }, 'assembler': { 'type': self._config.assembler_type, 'class': self._assembler.__class__.__name__, 'module': self._assembler.__class__.__module__ } }