Arthur Passuello
initial commit
5e1a30c
"""
Modular Answer Generator Implementation.
This module implements the primary AnswerGenerator interface that
coordinates all answer generation sub-components (prompt building,
LLM interaction, response parsing, confidence scoring).
Architecture Notes:
- Implements AnswerGenerator interface from core.interfaces
- Coordinates sub-components for flexible generation
- Configuration-driven component selection
- Provides unified interface for answer generation
- Extensive use of adapters for LLM integration
"""
import time
import logging
from typing import List, Dict, Any, Optional, Iterator
from pathlib import Path
import sys
# Add project paths for imports
project_root = Path(__file__).parent.parent.parent.parent
sys.path.append(str(project_root))
from src.core.interfaces import AnswerGenerator as AnswerGeneratorInterface, Document, Answer, HealthStatus
# Forward declaration to avoid circular import
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.core.platform_orchestrator import PlatformOrchestrator
from .base import ConfigurableComponent, GenerationParams, GenerationError, Citation
# Import sub-component registries
from .prompt_builders import get_builder_class
from .llm_adapters import get_adapter_class
from .response_parsers import get_parser_class
from .confidence_scorers import get_scorer_class
logger = logging.getLogger(__name__)
class AnswerGenerator(AnswerGeneratorInterface, ConfigurableComponent):
"""
Modular answer generator with configurable sub-components.
This generator implements the AnswerGenerator interface while
providing a modular architecture where prompt building, LLM selection,
response parsing, and confidence scoring can be configured independently.
Key Architecture Points:
- ALL LLM clients use adapters (unlike Document Processor)
- Direct implementations for algorithms (prompting, parsing, scoring)
- Configuration-driven component selection
- Supports both structured config and legacy parameters
Features:
- Multiple LLM provider support through adapters
- Configurable prompt strategies
- Flexible response parsing
- Multi-method confidence scoring
- Streaming support (when available)
- Comprehensive error handling
Configuration Structure:
{
"prompt_builder": {
"type": "simple",
"config": {...}
},
"llm_client": {
"type": "ollama",
"config": {...}
},
"response_parser": {
"type": "markdown",
"config": {...}
},
"confidence_scorer": {
"type": "semantic",
"config": {...}
}
}
"""
def __init__(self,
config: Optional[Dict[str, Any]] = None,
# Legacy parameters for backward compatibility
model_name: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
use_ollama: Optional[bool] = None,
ollama_url: Optional[str] = None,
**kwargs):
"""
Initialize the modular answer generator.
Args:
config: Configuration dictionary for all sub-components
model_name: Legacy parameter for model name
temperature: Legacy parameter for generation temperature
max_tokens: Legacy parameter for max tokens
use_ollama: Legacy parameter to use Ollama
ollama_url: Legacy parameter for Ollama URL
**kwargs: Additional legacy parameters
"""
# Default configuration
default_config = {
'prompt_builder': {
'type': 'simple',
'config': {
'max_context_length': 4000,
'include_instructions': True,
'citation_style': 'inline'
}
},
'llm_client': {
'type': 'ollama',
'config': {
'model_name': 'llama3.2',
'base_url': 'http://localhost:11434',
'timeout': 120
}
},
'response_parser': {
'type': 'markdown',
'config': {
'extract_citations': True,
'preserve_formatting': True
}
},
'confidence_scorer': {
'type': 'semantic',
'config': {
'relevance_weight': 0.4,
'grounding_weight': 0.4,
'quality_weight': 0.2
}
}
}
# Handle both ComponentFactory style (config=None, all params as kwargs)
# and direct instantiation style (config=dict, minimal kwargs)
# Check if we have structured config passed directly
if config is not None and 'llm_client' in config:
# Direct instantiation with structured config - use as-is
logger.debug("Using provided structured configuration")
final_config = config
elif (config is None and
'llm_client' in kwargs and
isinstance(kwargs['llm_client'], dict) and
'type' in kwargs['llm_client']):
# ComponentFactory style - all params in kwargs including structured llm_client
logger.debug("Converting ComponentFactory parameters to structured config")
final_config = kwargs.copy()
else:
# Legacy parameters - convert to structured format
logger.info("Converting legacy parameters to new configuration format")
final_config = default_config.copy()
# Process legacy parameters
has_legacy_params = (model_name is not None or
temperature is not None or
max_tokens is not None or
use_ollama is not None or
ollama_url is not None)
if has_legacy_params:
# Ensure proper nested structure
final_config.setdefault('llm_client', {})
final_config['llm_client'].setdefault('config', {})
# Handle different LLM providers
if use_ollama is False:
# Check if we have a configured LLM client type
if 'llm_client' in final_config and 'type' in final_config['llm_client']:
# Use the configured LLM client type
logger.info(f"Using configured LLM client: {final_config['llm_client']['type']}")
else:
# Default to Ollama if no alternative is configured
logger.warning("Non-Ollama providers not configured, using Ollama")
if model_name:
final_config['llm_client']['config']['model_name'] = model_name
if temperature is not None:
final_config['llm_client']['config']['temperature'] = temperature
if max_tokens is not None:
final_config['llm_client']['config']['max_tokens'] = max_tokens
if ollama_url:
final_config['llm_client']['config']['base_url'] = ollama_url
# Merge with defaults
self.config = self._merge_configs(default_config, final_config)
# Initialize sub-components
self._initialize_components()
# Track metrics
self._generation_count = 0
self._total_time = 0.0
# Platform services (initialized via initialize_services)
self.platform: Optional['PlatformOrchestrator'] = None
logger.info(f"Initialized AnswerGenerator with components: "
f"prompt_builder={self.config['prompt_builder']['type']}, "
f"llm_client={self.config['llm_client']['type']}, "
f"response_parser={self.config['response_parser']['type']}, "
f"confidence_scorer={self.config['confidence_scorer']['type']}")
def generate(self, query: str, context: List[Document]) -> Answer:
"""
Generate an answer from query and context documents.
This method orchestrates all sub-components to produce a high-quality
answer with citations and confidence scoring.
Args:
query: User query string
context: List of relevant context documents
Returns:
Answer object with generated text, sources, confidence, and metadata
Raises:
ValueError: If query is empty or context is invalid
GenerationError: If answer generation fails
"""
if not query.strip():
raise ValueError("Query cannot be empty")
start_time = time.time()
try:
# Step 1: Build prompt
logger.debug("Building prompt...")
prompt = self.prompt_builder.build_prompt(query, context)
# Step 2: Generate response
logger.debug("Generating response...")
generation_params = self._get_generation_params()
raw_response = self.llm_client.generate(prompt, generation_params)
# Step 3: Parse response
logger.debug("Parsing response...")
parsed_response = self.response_parser.parse(raw_response)
answer_text = parsed_response.get('answer', raw_response)
# Step 4: Extract citations
citations = self.response_parser.extract_citations(parsed_response, context)
# Step 5: Calculate confidence
logger.debug("Calculating confidence...")
confidence = self.confidence_scorer.score(query, answer_text, context)
# Override with parsed confidence if available and higher
if 'confidence' in parsed_response:
confidence = max(confidence, parsed_response['confidence'])
# Step 6: Build metadata
elapsed_time = time.time() - start_time
metadata = self._build_metadata(
query, parsed_response, citations, elapsed_time
)
# Update metrics
self._generation_count += 1
self._total_time += elapsed_time
# Track performance using platform services
if self.platform:
self.platform.track_component_performance(
self,
"answer_generation",
{
"success": True,
"generation_time": elapsed_time,
"confidence": confidence,
"query": query,
"context_docs": len(context),
"answer_length": len(answer_text),
"citations_count": len(citations)
}
)
logger.info(f"Generated answer in {elapsed_time:.2f}s with confidence {confidence:.3f}")
return Answer(
text=answer_text,
sources=context,
confidence=confidence,
metadata=metadata
)
except Exception as e:
# Track failure using platform services
if self.platform:
elapsed_time = time.time() - start_time
self.platform.track_component_performance(
self,
"answer_generation",
{
"success": False,
"generation_time": elapsed_time,
"query": query,
"context_docs": len(context),
"error": str(e)
}
)
logger.error(f"Answer generation failed: {str(e)}")
raise GenerationError(f"Failed to generate answer: {str(e)}") from e
def get_component_info(self) -> Dict[str, Any]:
"""
Get information about active sub-components.
This method is used by ComponentFactory for enhanced logging.
Returns:
Dictionary with component information
"""
return {
'prompt_builder': {
'type': self.config['prompt_builder']['type'],
'class': self.prompt_builder.__class__.__name__,
**self.prompt_builder.get_builder_info()
},
'llm_client': {
'type': self.config['llm_client']['type'],
'class': self.llm_client.__class__.__name__,
**self.llm_client.get_model_info()
},
'response_parser': {
'type': self.config['response_parser']['type'],
'class': self.response_parser.__class__.__name__,
**self.response_parser.get_parser_info()
},
'confidence_scorer': {
'type': self.config['confidence_scorer']['type'],
'class': self.confidence_scorer.__class__.__name__,
**self.confidence_scorer.get_scorer_info()
}
}
def get_generator_info(self) -> Dict[str, Any]:
"""
Get information about the generator configuration.
For compatibility with AdaptiveAnswerGenerator interface.
Returns:
Dictionary with generator configuration and capabilities
"""
model_info = self.llm_client.get_model_info()
return {
'model_name': model_info.get('model', 'unknown'),
'provider': model_info.get('provider', 'unknown'),
'temperature': self._get_generation_params().temperature,
'max_tokens': self._get_generation_params().max_tokens,
'supports_streaming': model_info.get('supports_streaming', False),
'components': {
'prompt_builder': self.config['prompt_builder']['type'],
'llm_client': self.config['llm_client']['type'],
'response_parser': self.config['response_parser']['type'],
'confidence_scorer': self.config['confidence_scorer']['type']
},
'metrics': {
'generations': self._generation_count,
'avg_time': self._total_time / max(1, self._generation_count)
}
}
def validate_configuration(self) -> bool:
"""
Validate the current configuration.
Returns:
True if configuration is valid
Raises:
ValueError: If configuration is invalid
"""
# Validate all components are properly initialized
if not all([self.prompt_builder, self.llm_client,
self.response_parser, self.confidence_scorer]):
raise ValueError("Not all components are initialized")
# Validate LLM connection
try:
if not self.llm_client.validate_connection():
raise ValueError("LLM connection validation failed")
except Exception as e:
raise ValueError(f"LLM validation error: {str(e)}")
return True
# Standard ComponentBase interface implementation
def initialize_services(self, platform: 'PlatformOrchestrator') -> None:
"""Initialize platform services for the component.
Args:
platform: PlatformOrchestrator instance providing services
"""
self.platform = platform
logger.info("AnswerGenerator initialized with platform services")
def get_health_status(self) -> HealthStatus:
"""Get the current health status of the component.
Returns:
HealthStatus object with component health information
"""
if self.platform:
return self.platform.check_component_health(self)
# Fallback if platform services not initialized
is_healthy = True
issues = []
# Check sub-components
if not self.prompt_builder:
is_healthy = False
issues.append("Prompt builder not initialized")
if not self.llm_client:
is_healthy = False
issues.append("LLM client not initialized")
if not self.response_parser:
is_healthy = False
issues.append("Response parser not initialized")
if not self.confidence_scorer:
is_healthy = False
issues.append("Confidence scorer not initialized")
# Check LLM connection
try:
if self.llm_client and not self.llm_client.validate_connection():
is_healthy = False
issues.append("LLM connection validation failed")
except Exception as e:
is_healthy = False
issues.append(f"LLM validation error: {str(e)}")
return HealthStatus(
is_healthy=is_healthy,
issues=issues,
metrics={
"sub_components": self.get_component_info(),
"generator_info": self.get_generator_info()
},
component_name=self.__class__.__name__
)
def get_metrics(self) -> Dict[str, Any]:
"""Get component-specific metrics.
Returns:
Dictionary containing component metrics
"""
if self.platform:
return self.platform.collect_component_metrics(self)
# Fallback if platform services not initialized
return {
"generation_count": self._generation_count,
"total_time": self._total_time,
"avg_time": self._total_time / max(1, self._generation_count),
"sub_components": self.get_component_info(),
"config": {
"prompt_builder": self.config['prompt_builder']['type'],
"llm_client": self.config['llm_client']['type'],
"response_parser": self.config['response_parser']['type'],
"confidence_scorer": self.config['confidence_scorer']['type']
}
}
def get_capabilities(self) -> List[str]:
"""Get list of component capabilities.
Returns:
List of capability strings
"""
capabilities = [
"answer_generation",
"context_synthesis",
"modular_architecture",
"configurable_prompting",
"confidence_scoring",
"citation_generation"
]
# Add LLM-specific capabilities
if self.llm_client:
capabilities.append(f"llm_{self.config['llm_client']['type']}")
# Add prompt builder capabilities
if self.prompt_builder:
capabilities.append(f"prompt_builder_{self.config['prompt_builder']['type']}")
# Add parser capabilities
if self.response_parser:
capabilities.append(f"response_parser_{self.config['response_parser']['type']}")
# Add scorer capabilities
if self.confidence_scorer:
capabilities.append(f"confidence_scorer_{self.config['confidence_scorer']['type']}")
return capabilities
def _initialize_components(self) -> None:
"""Initialize all sub-components based on configuration."""
# Initialize prompt builder
builder_type = self.config['prompt_builder']['type']
builder_config = self.config['prompt_builder'].get('config', {})
builder_class = get_builder_class(builder_type)
self.prompt_builder = builder_class(**builder_config)
# Initialize LLM adapter
llm_type = self.config['llm_client']['type']
llm_config = self.config['llm_client'].get('config', {})
adapter_class = get_adapter_class(llm_type)
# Separate adapter constructor args from generation config
# Different adapters expect different parameters
adapter_args = {}
generation_config = {}
# Get adapter-specific parameter names from the adapter class
import inspect
adapter_signature = inspect.signature(adapter_class.__init__)
adapter_params = set(adapter_signature.parameters.keys()) - {'self'}
for key, value in llm_config.items():
if key in adapter_params:
adapter_args[key] = value
else:
generation_config[key] = value
# Pass generation config as config parameter if there are any
if generation_config:
adapter_args['config'] = generation_config
self.llm_client = adapter_class(**adapter_args)
# Initialize response parser
parser_type = self.config['response_parser']['type']
parser_config = self.config['response_parser'].get('config', {})
parser_class = get_parser_class(parser_type)
self.response_parser = parser_class(**parser_config)
# Initialize confidence scorer
scorer_type = self.config['confidence_scorer']['type']
scorer_config = self.config['confidence_scorer'].get('config', {})
scorer_class = get_scorer_class(scorer_type)
# For semantic scorer, we'll pass embedder after platform initialization
# For now, initialize without embedder
self.confidence_scorer = scorer_class(**scorer_config)
def _get_generation_params(self) -> GenerationParams:
"""Get generation parameters from configuration."""
llm_config = self.config['llm_client'].get('config', {})
return GenerationParams(
temperature=llm_config.get('temperature', 0.7),
max_tokens=llm_config.get('max_tokens', 512),
top_p=llm_config.get('top_p', 1.0),
frequency_penalty=llm_config.get('frequency_penalty', 0.0),
presence_penalty=llm_config.get('presence_penalty', 0.0),
stop_sequences=llm_config.get('stop_sequences')
)
def _build_metadata(self,
query: str,
parsed_response: Dict[str, Any],
citations: List[Citation],
elapsed_time: float) -> Dict[str, Any]:
"""Build metadata for the answer."""
metadata = {
'generator_type': 'modular',
'generation_time': elapsed_time,
'query_length': len(query),
'citations_found': len(citations),
'components_used': {
'prompt_builder': self.config['prompt_builder']['type'],
'llm_client': self.config['llm_client']['type'],
'response_parser': self.config['response_parser']['type'],
'confidence_scorer': self.config['confidence_scorer']['type']
}
}
# Add parsed metadata
if 'metadata' in parsed_response:
metadata['parsed_metadata'] = parsed_response['metadata']
# Add model info
model_info = self.llm_client.get_model_info()
metadata['model'] = model_info.get('model', 'unknown')
metadata['provider'] = model_info.get('provider', 'unknown')
return metadata
def set_embedder(self, embedder):
"""Set embedder for semantic confidence scoring."""
if hasattr(self.confidence_scorer, 'set_embedder'):
self.confidence_scorer.set_embedder(embedder)
logger.info("Embedder set for semantic confidence scoring")
def _merge_configs(self, default: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
"""Deep merge configuration dictionaries."""
result = default.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._merge_configs(result[key], value)
else:
result[key] = value
return result