Arthur Passuello
initial commit
5e1a30c
"""
Rich Response Assembler Implementation.
This module provides comprehensive response assembly with detailed metadata,
source information, and enhanced formatting for production use.
Features:
- Comprehensive metadata collection
- Citation analysis and validation
- Source document summaries
- Quality metrics and confidence scoring
- Detailed assembly diagnostics
"""
import logging
from typing import Dict, Any, List, Optional
from pathlib import Path
import sys
# Add project paths for imports
project_root = Path(__file__).parent.parent.parent.parent.parent
sys.path.append(str(project_root))
from ..base import ContextSelection, QueryAnalysis
from .base_assembler import BaseResponseAssembler
from src.core.interfaces import Answer, Document
logger = logging.getLogger(__name__)
class RichAssembler(BaseResponseAssembler):
"""
Rich response assembler with comprehensive metadata and formatting.
This assembler creates Answer objects with detailed metadata, source
summaries, citation analysis, and quality metrics. It's designed for
production use where comprehensive information is needed.
Configuration Options:
- include_source_summaries: Include document summaries (default: True)
- include_citation_analysis: Analyze citations in answer (default: True)
- include_quality_metrics: Include quality assessment (default: True)
- include_debug_info: Include assembly diagnostics (default: False)
- citation_format: Citation format style ("inline", "numbered", "document")
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize rich assembler with configuration.
Args:
config: Configuration dictionary
"""
# Initialize attributes first before calling super().__init__
config_dict = config or {}
self._include_source_summaries = config_dict.get('include_source_summaries', True)
self._include_citation_analysis = config_dict.get('include_citation_analysis', True)
self._include_quality_metrics = config_dict.get('include_quality_metrics', True)
self._include_debug_info = config_dict.get('include_debug_info', False)
self._citation_format = config_dict.get('citation_format', 'inline')
super().__init__(config)
logger.debug(f"Initialized RichAssembler with citation_format={self._citation_format}")
def _assemble_answer(
self,
query: str,
answer_text: str,
context: ContextSelection,
confidence: float,
query_analysis: Optional[QueryAnalysis] = None,
generation_metadata: Optional[Dict[str, Any]] = None
) -> Answer:
"""
Assemble comprehensive Answer object with rich metadata.
Args:
query: Validated query string
answer_text: Validated answer text
context: Context selection
confidence: Validated confidence score
query_analysis: Optional query analysis
generation_metadata: Optional generation metadata
Returns:
Answer object with comprehensive metadata
"""
# Format answer text
formatted_text = self._format_answer_text(answer_text)
# Create sources list
sources = self._create_sources_list(context)
# Create base metadata
metadata = self._create_base_metadata(query, context, query_analysis, generation_metadata)
# Add rich metadata
if self._include_source_summaries:
metadata['source_summaries'] = self._create_source_summaries(context.selected_documents)
if self._include_citation_analysis:
citation_analysis = self._analyze_citations(formatted_text, context.selected_documents)
metadata['citation_analysis'] = citation_analysis
if self._include_quality_metrics:
quality_metrics = self._calculate_quality_metrics(
formatted_text, context, confidence, query_analysis
)
metadata['quality_metrics'] = quality_metrics
if self._include_debug_info:
debug_info = self._create_debug_info(context, generation_metadata)
metadata['debug_info'] = debug_info
# Add assembly-specific metadata
metadata.update({
'assembler_version': '1.0',
'assembly_features': self._get_enabled_features(),
'answer_length': len(formatted_text),
'word_count': len(formatted_text.split()),
'source_count': len(sources)
})
return Answer(
text=formatted_text,
sources=sources,
confidence=confidence,
metadata=metadata
)
def _create_source_summaries(self, documents: List[Document]) -> List[Dict[str, Any]]:
"""
Create summaries for source documents.
Args:
documents: Source documents
Returns:
List of document summaries
"""
summaries = []
for i, doc in enumerate(documents):
# Get source and chunk_id from metadata or attributes
source = getattr(doc, 'source', None) or doc.metadata.get('source', 'unknown')
chunk_id = getattr(doc, 'chunk_id', None) or doc.metadata.get('chunk_id', 'unknown')
summary = {
'index': i,
'source': source,
'chunk_id': chunk_id,
'content_length': len(doc.content),
'word_count': len(doc.content.split()),
'preview': doc.content[:200] + "..." if len(doc.content) > 200 else doc.content
}
# Add document metadata if available
if doc.metadata:
# Extract useful metadata fields
metadata_fields = ['page', 'title', 'section', 'quality_score']
for field in metadata_fields:
if field in doc.metadata:
summary[field] = doc.metadata[field]
# Add relevance score if available
if hasattr(doc, 'score'):
summary['relevance_score'] = doc.score
summaries.append(summary)
return summaries
def _analyze_citations(self, answer_text: str, documents: List[Document]) -> Dict[str, Any]:
"""
Analyze citations in the answer text.
Args:
answer_text: Answer text to analyze
documents: Source documents used
Returns:
Citation analysis results
"""
citations_found = self._extract_citations_from_text(answer_text)
analysis = {
'citations_found': citations_found,
'citation_count': len(citations_found),
'has_citations': len(citations_found) > 0,
'citation_density': len(citations_found) / max(1, len(answer_text.split())) * 100, # Citations per 100 words
}
# Validate citations against available sources
validation_results = self._validate_citations(citations_found, documents)
analysis['validation'] = validation_results
# Analyze citation patterns
pattern_analysis = self._analyze_citation_patterns(citations_found)
analysis['patterns'] = pattern_analysis
return analysis
def _validate_citations(self, citations: List[str], documents: List[Document]) -> Dict[str, Any]:
"""
Validate that citations reference available documents.
Args:
citations: List of citation strings
documents: Available source documents
Returns:
Citation validation results
"""
validation = {
'valid_citations': [],
'invalid_citations': [],
'validation_rate': 0.0
}
# Create mapping of available document references
available_refs = set()
for i, doc in enumerate(documents):
# Common reference formats
available_refs.add(f"[Document {i+1}]")
available_refs.add(f"[{i+1}]")
# Get chunk_id from attribute or metadata
chunk_id = getattr(doc, 'chunk_id', None) or doc.metadata.get('chunk_id', None)
if chunk_id:
available_refs.add(f"[{chunk_id}]")
# Validate each citation
for citation in citations:
if citation in available_refs:
validation['valid_citations'].append(citation)
else:
validation['invalid_citations'].append(citation)
# Calculate validation rate
total_citations = len(citations)
if total_citations > 0:
validation['validation_rate'] = len(validation['valid_citations']) / total_citations
return validation
def _analyze_citation_patterns(self, citations: List[str]) -> Dict[str, Any]:
"""
Analyze patterns in citation usage.
Args:
citations: List of citation strings
Returns:
Pattern analysis results
"""
import re
patterns = {
'document_format': 0, # [Document N]
'simple_format': 0, # [N]
'chunk_format': 0, # [chunk_N]
'page_format': 0 # [Document N, Page N]
}
for citation in citations:
if re.match(r'\[Document \d+, Page \d+\]', citation):
patterns['page_format'] += 1
elif re.match(r'\[Document \d+\]', citation):
patterns['document_format'] += 1
elif re.match(r'\[chunk_\d+\]', citation):
patterns['chunk_format'] += 1
elif re.match(r'\[\d+\]', citation):
patterns['simple_format'] += 1
# Determine dominant pattern
dominant_pattern = max(patterns.items(), key=lambda x: x[1])[0] if citations else 'none'
return {
'format_counts': patterns,
'dominant_format': dominant_pattern,
'format_consistency': max(patterns.values()) / max(1, len(citations))
}
def _calculate_quality_metrics(
self,
answer_text: str,
context: ContextSelection,
confidence: float,
query_analysis: Optional[QueryAnalysis] = None
) -> Dict[str, Any]:
"""
Calculate quality metrics for the assembled answer.
Args:
answer_text: Generated answer text
context: Context selection used
confidence: Answer confidence score
query_analysis: Optional query analysis
Returns:
Quality metrics dictionary
"""
metrics = {}
# Text quality metrics
metrics['answer_length'] = len(answer_text)
metrics['word_count'] = len(answer_text.split())
metrics['sentence_count'] = answer_text.count('.') + answer_text.count('!') + answer_text.count('?')
# Calculate average sentence length
if metrics['sentence_count'] > 0:
metrics['avg_sentence_length'] = metrics['word_count'] / metrics['sentence_count']
else:
metrics['avg_sentence_length'] = 0.0
# Content quality indicators
metrics['has_technical_content'] = any(
term in answer_text.lower()
for term in ['implementation', 'algorithm', 'protocol', 'configuration', 'api']
)
metrics['has_examples'] = any(
phrase in answer_text.lower()
for phrase in ['example', 'for instance', 'such as', 'like']
)
metrics['has_explanations'] = any(
phrase in answer_text.lower()
for phrase in ['because', 'since', 'due to', 'this means', 'in other words']
)
# Source utilization metrics
metrics['sources_used'] = len(context.selected_documents)
metrics['token_efficiency'] = context.total_tokens / max(1, len(answer_text))
if hasattr(context, 'relevance_score') and context.relevance_score is not None:
metrics['source_relevance'] = context.relevance_score
if hasattr(context, 'diversity_score') and context.diversity_score is not None:
metrics['source_diversity'] = context.diversity_score
# Overall quality score (0.0 - 1.0)
quality_components = [
confidence, # LLM confidence
min(1.0, metrics['word_count'] / 50), # Length adequacy (up to 50 words)
1.0 if metrics['has_technical_content'] else 0.5, # Technical content
1.0 if metrics['has_explanations'] else 0.7, # Explanatory content
min(1.0, metrics['sources_used'] / 3) # Source utilization (up to 3 sources)
]
metrics['overall_quality'] = sum(quality_components) / len(quality_components)
return metrics
def _create_debug_info(
self,
context: ContextSelection,
generation_metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create debug information for troubleshooting.
Args:
context: Context selection
generation_metadata: Generation metadata
Returns:
Debug information dictionary
"""
debug_info = {
'context_metadata': context.metadata,
'selection_strategy': context.selection_strategy,
'total_tokens': context.total_tokens
}
if generation_metadata:
debug_info['generation_metadata'] = generation_metadata
# Add assembler configuration
debug_info['assembler_config'] = {
'include_source_summaries': self._include_source_summaries,
'include_citation_analysis': self._include_citation_analysis,
'include_quality_metrics': self._include_quality_metrics,
'citation_format': self._citation_format
}
return debug_info
def _get_enabled_features(self) -> List[str]:
"""
Get list of enabled rich assembler features.
Returns:
List of enabled feature names
"""
features = []
if self._include_source_summaries:
features.append('source_summaries')
if self._include_citation_analysis:
features.append('citation_analysis')
if self._include_quality_metrics:
features.append('quality_metrics')
if self._include_debug_info:
features.append('debug_info')
if self._format_citations:
features.append('citation_formatting')
return features
def get_supported_formats(self) -> List[str]:
"""
Return list of formats this rich assembler supports.
Returns:
List of format names
"""
base_formats = super().get_supported_formats()
rich_formats = [
'rich',
'comprehensive',
'detailed',
'production'
]
return base_formats + rich_formats
def configure(self, config: Dict[str, Any]) -> None:
"""
Configure the rich assembler with provided settings.
Args:
config: Configuration dictionary
"""
super().configure(config)
# Update rich assembler specific configuration
self._include_source_summaries = config.get('include_source_summaries', self._include_source_summaries)
self._include_citation_analysis = config.get('include_citation_analysis', self._include_citation_analysis)
self._include_quality_metrics = config.get('include_quality_metrics', self._include_quality_metrics)
self._include_debug_info = config.get('include_debug_info', self._include_debug_info)
# Validate citation format
valid_formats = ['inline', 'numbered', 'document']
new_format = config.get('citation_format', self._citation_format)
if new_format in valid_formats:
self._citation_format = new_format
else:
logger.warning(f"Invalid citation_format {new_format}, keeping {self._citation_format}")
def _format_answer_text(self, answer_text: str) -> str:
"""
Format answer text with rich formatting options.
Args:
answer_text: Raw answer text
Returns:
Formatted answer text
"""
# Base formatting
formatted = super()._format_answer_text(answer_text)
# Additional rich formatting
if self._format_citations:
formatted = self._apply_citation_formatting(formatted)
return formatted
def _apply_citation_formatting(self, text: str) -> str:
"""
Apply citation formatting based on configuration.
Args:
text: Text with citations
Returns:
Text with formatted citations
"""
# This is a placeholder for citation formatting
# Can be extended based on specific formatting requirements
return text