""" Rich Response Assembler Implementation. This module provides comprehensive response assembly with detailed metadata, source information, and enhanced formatting for production use. Features: - Comprehensive metadata collection - Citation analysis and validation - Source document summaries - Quality metrics and confidence scoring - Detailed assembly diagnostics """ import logging from typing import Dict, Any, List, Optional from pathlib import Path import sys # Add project paths for imports project_root = Path(__file__).parent.parent.parent.parent.parent sys.path.append(str(project_root)) from ..base import ContextSelection, QueryAnalysis from .base_assembler import BaseResponseAssembler from src.core.interfaces import Answer, Document logger = logging.getLogger(__name__) class RichAssembler(BaseResponseAssembler): """ Rich response assembler with comprehensive metadata and formatting. This assembler creates Answer objects with detailed metadata, source summaries, citation analysis, and quality metrics. It's designed for production use where comprehensive information is needed. Configuration Options: - include_source_summaries: Include document summaries (default: True) - include_citation_analysis: Analyze citations in answer (default: True) - include_quality_metrics: Include quality assessment (default: True) - include_debug_info: Include assembly diagnostics (default: False) - citation_format: Citation format style ("inline", "numbered", "document") """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize rich assembler with configuration. Args: config: Configuration dictionary """ # Initialize attributes first before calling super().__init__ config_dict = config or {} self._include_source_summaries = config_dict.get('include_source_summaries', True) self._include_citation_analysis = config_dict.get('include_citation_analysis', True) self._include_quality_metrics = config_dict.get('include_quality_metrics', True) self._include_debug_info = config_dict.get('include_debug_info', False) self._citation_format = config_dict.get('citation_format', 'inline') super().__init__(config) logger.debug(f"Initialized RichAssembler with citation_format={self._citation_format}") def _assemble_answer( self, query: str, answer_text: str, context: ContextSelection, confidence: float, query_analysis: Optional[QueryAnalysis] = None, generation_metadata: Optional[Dict[str, Any]] = None ) -> Answer: """ Assemble comprehensive Answer object with rich metadata. Args: query: Validated query string answer_text: Validated answer text context: Context selection confidence: Validated confidence score query_analysis: Optional query analysis generation_metadata: Optional generation metadata Returns: Answer object with comprehensive metadata """ # Format answer text formatted_text = self._format_answer_text(answer_text) # Create sources list sources = self._create_sources_list(context) # Create base metadata metadata = self._create_base_metadata(query, context, query_analysis, generation_metadata) # Add rich metadata if self._include_source_summaries: metadata['source_summaries'] = self._create_source_summaries(context.selected_documents) if self._include_citation_analysis: citation_analysis = self._analyze_citations(formatted_text, context.selected_documents) metadata['citation_analysis'] = citation_analysis if self._include_quality_metrics: quality_metrics = self._calculate_quality_metrics( formatted_text, context, confidence, query_analysis ) metadata['quality_metrics'] = quality_metrics if self._include_debug_info: debug_info = self._create_debug_info(context, generation_metadata) metadata['debug_info'] = debug_info # Add assembly-specific metadata metadata.update({ 'assembler_version': '1.0', 'assembly_features': self._get_enabled_features(), 'answer_length': len(formatted_text), 'word_count': len(formatted_text.split()), 'source_count': len(sources) }) return Answer( text=formatted_text, sources=sources, confidence=confidence, metadata=metadata ) def _create_source_summaries(self, documents: List[Document]) -> List[Dict[str, Any]]: """ Create summaries for source documents. Args: documents: Source documents Returns: List of document summaries """ summaries = [] for i, doc in enumerate(documents): # Get source and chunk_id from metadata or attributes source = getattr(doc, 'source', None) or doc.metadata.get('source', 'unknown') chunk_id = getattr(doc, 'chunk_id', None) or doc.metadata.get('chunk_id', 'unknown') summary = { 'index': i, 'source': source, 'chunk_id': chunk_id, 'content_length': len(doc.content), 'word_count': len(doc.content.split()), 'preview': doc.content[:200] + "..." if len(doc.content) > 200 else doc.content } # Add document metadata if available if doc.metadata: # Extract useful metadata fields metadata_fields = ['page', 'title', 'section', 'quality_score'] for field in metadata_fields: if field in doc.metadata: summary[field] = doc.metadata[field] # Add relevance score if available if hasattr(doc, 'score'): summary['relevance_score'] = doc.score summaries.append(summary) return summaries def _analyze_citations(self, answer_text: str, documents: List[Document]) -> Dict[str, Any]: """ Analyze citations in the answer text. Args: answer_text: Answer text to analyze documents: Source documents used Returns: Citation analysis results """ citations_found = self._extract_citations_from_text(answer_text) analysis = { 'citations_found': citations_found, 'citation_count': len(citations_found), 'has_citations': len(citations_found) > 0, 'citation_density': len(citations_found) / max(1, len(answer_text.split())) * 100, # Citations per 100 words } # Validate citations against available sources validation_results = self._validate_citations(citations_found, documents) analysis['validation'] = validation_results # Analyze citation patterns pattern_analysis = self._analyze_citation_patterns(citations_found) analysis['patterns'] = pattern_analysis return analysis def _validate_citations(self, citations: List[str], documents: List[Document]) -> Dict[str, Any]: """ Validate that citations reference available documents. Args: citations: List of citation strings documents: Available source documents Returns: Citation validation results """ validation = { 'valid_citations': [], 'invalid_citations': [], 'validation_rate': 0.0 } # Create mapping of available document references available_refs = set() for i, doc in enumerate(documents): # Common reference formats available_refs.add(f"[Document {i+1}]") available_refs.add(f"[{i+1}]") # Get chunk_id from attribute or metadata chunk_id = getattr(doc, 'chunk_id', None) or doc.metadata.get('chunk_id', None) if chunk_id: available_refs.add(f"[{chunk_id}]") # Validate each citation for citation in citations: if citation in available_refs: validation['valid_citations'].append(citation) else: validation['invalid_citations'].append(citation) # Calculate validation rate total_citations = len(citations) if total_citations > 0: validation['validation_rate'] = len(validation['valid_citations']) / total_citations return validation def _analyze_citation_patterns(self, citations: List[str]) -> Dict[str, Any]: """ Analyze patterns in citation usage. Args: citations: List of citation strings Returns: Pattern analysis results """ import re patterns = { 'document_format': 0, # [Document N] 'simple_format': 0, # [N] 'chunk_format': 0, # [chunk_N] 'page_format': 0 # [Document N, Page N] } for citation in citations: if re.match(r'\[Document \d+, Page \d+\]', citation): patterns['page_format'] += 1 elif re.match(r'\[Document \d+\]', citation): patterns['document_format'] += 1 elif re.match(r'\[chunk_\d+\]', citation): patterns['chunk_format'] += 1 elif re.match(r'\[\d+\]', citation): patterns['simple_format'] += 1 # Determine dominant pattern dominant_pattern = max(patterns.items(), key=lambda x: x[1])[0] if citations else 'none' return { 'format_counts': patterns, 'dominant_format': dominant_pattern, 'format_consistency': max(patterns.values()) / max(1, len(citations)) } def _calculate_quality_metrics( self, answer_text: str, context: ContextSelection, confidence: float, query_analysis: Optional[QueryAnalysis] = None ) -> Dict[str, Any]: """ Calculate quality metrics for the assembled answer. Args: answer_text: Generated answer text context: Context selection used confidence: Answer confidence score query_analysis: Optional query analysis Returns: Quality metrics dictionary """ metrics = {} # Text quality metrics metrics['answer_length'] = len(answer_text) metrics['word_count'] = len(answer_text.split()) metrics['sentence_count'] = answer_text.count('.') + answer_text.count('!') + answer_text.count('?') # Calculate average sentence length if metrics['sentence_count'] > 0: metrics['avg_sentence_length'] = metrics['word_count'] / metrics['sentence_count'] else: metrics['avg_sentence_length'] = 0.0 # Content quality indicators metrics['has_technical_content'] = any( term in answer_text.lower() for term in ['implementation', 'algorithm', 'protocol', 'configuration', 'api'] ) metrics['has_examples'] = any( phrase in answer_text.lower() for phrase in ['example', 'for instance', 'such as', 'like'] ) metrics['has_explanations'] = any( phrase in answer_text.lower() for phrase in ['because', 'since', 'due to', 'this means', 'in other words'] ) # Source utilization metrics metrics['sources_used'] = len(context.selected_documents) metrics['token_efficiency'] = context.total_tokens / max(1, len(answer_text)) if hasattr(context, 'relevance_score') and context.relevance_score is not None: metrics['source_relevance'] = context.relevance_score if hasattr(context, 'diversity_score') and context.diversity_score is not None: metrics['source_diversity'] = context.diversity_score # Overall quality score (0.0 - 1.0) quality_components = [ confidence, # LLM confidence min(1.0, metrics['word_count'] / 50), # Length adequacy (up to 50 words) 1.0 if metrics['has_technical_content'] else 0.5, # Technical content 1.0 if metrics['has_explanations'] else 0.7, # Explanatory content min(1.0, metrics['sources_used'] / 3) # Source utilization (up to 3 sources) ] metrics['overall_quality'] = sum(quality_components) / len(quality_components) return metrics def _create_debug_info( self, context: ContextSelection, generation_metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Create debug information for troubleshooting. Args: context: Context selection generation_metadata: Generation metadata Returns: Debug information dictionary """ debug_info = { 'context_metadata': context.metadata, 'selection_strategy': context.selection_strategy, 'total_tokens': context.total_tokens } if generation_metadata: debug_info['generation_metadata'] = generation_metadata # Add assembler configuration debug_info['assembler_config'] = { 'include_source_summaries': self._include_source_summaries, 'include_citation_analysis': self._include_citation_analysis, 'include_quality_metrics': self._include_quality_metrics, 'citation_format': self._citation_format } return debug_info def _get_enabled_features(self) -> List[str]: """ Get list of enabled rich assembler features. Returns: List of enabled feature names """ features = [] if self._include_source_summaries: features.append('source_summaries') if self._include_citation_analysis: features.append('citation_analysis') if self._include_quality_metrics: features.append('quality_metrics') if self._include_debug_info: features.append('debug_info') if self._format_citations: features.append('citation_formatting') return features def get_supported_formats(self) -> List[str]: """ Return list of formats this rich assembler supports. Returns: List of format names """ base_formats = super().get_supported_formats() rich_formats = [ 'rich', 'comprehensive', 'detailed', 'production' ] return base_formats + rich_formats def configure(self, config: Dict[str, Any]) -> None: """ Configure the rich assembler with provided settings. Args: config: Configuration dictionary """ super().configure(config) # Update rich assembler specific configuration self._include_source_summaries = config.get('include_source_summaries', self._include_source_summaries) self._include_citation_analysis = config.get('include_citation_analysis', self._include_citation_analysis) self._include_quality_metrics = config.get('include_quality_metrics', self._include_quality_metrics) self._include_debug_info = config.get('include_debug_info', self._include_debug_info) # Validate citation format valid_formats = ['inline', 'numbered', 'document'] new_format = config.get('citation_format', self._citation_format) if new_format in valid_formats: self._citation_format = new_format else: logger.warning(f"Invalid citation_format {new_format}, keeping {self._citation_format}") def _format_answer_text(self, answer_text: str) -> str: """ Format answer text with rich formatting options. Args: answer_text: Raw answer text Returns: Formatted answer text """ # Base formatting formatted = super()._format_answer_text(answer_text) # Additional rich formatting if self._format_citations: formatted = self._apply_citation_formatting(formatted) return formatted def _apply_citation_formatting(self, text: str) -> str: """ Apply citation formatting based on configuration. Args: text: Text with citations Returns: Text with formatted citations """ # This is a placeholder for citation formatting # Can be extended based on specific formatting requirements return text