enhanced-rag-demo / src /core /query_processor.py
Arthur Passuello
initial commit
5e1a30c
"""
Query Processor - Handles query execution workflow.
This component focuses solely on query processing logic,
separated from platform orchestration concerns. In Phase 1,
it extracts the query logic from RAGPipeline.
"""
import logging
from typing import List, Dict, Any, Optional
from .interfaces import Answer, RetrievalResult, Retriever, AnswerGenerator, Document
logger = logging.getLogger(__name__)
class QueryProcessor:
"""
Query Processor handles the query execution workflow.
Responsibilities:
- Query analysis and enhancement
- Retrieval orchestration
- Context selection and ranking
- Answer generation coordination
- Response assembly
In Phase 1, this component works with component references passed
during initialization. In Phase 3, it will be enhanced with direct
wiring and additional features like caching.
"""
def __init__(
self,
retriever: Retriever,
generator: AnswerGenerator,
config: Optional[Dict[str, Any]] = None
):
"""
Initialize query processor with component references.
Args:
retriever: Retriever component for document search
generator: Answer generator component for response generation
config: Optional configuration dictionary
"""
self.retriever = retriever
self.generator = generator
self.config = config or {}
# Extract configuration values
self.default_k = self.config.get('retrieval_k', 5)
self.min_confidence = self.config.get('min_confidence', 0.0)
logger.info("Query processor initialized")
def process(self, query: str, k: Optional[int] = None) -> Answer:
"""
Process a query and return an answer.
This method orchestrates the query processing workflow:
1. Analyze query (placeholder for future enhancement)
2. Retrieve relevant documents
3. Select and rank context
4. Generate answer
5. Assemble final response
Args:
query: User query string
k: Number of documents to retrieve (uses default if None)
Returns:
Answer object with generated text, sources, and metadata
Raises:
ValueError: If query is empty
RuntimeError: If query processing fails
"""
if not query.strip():
raise ValueError("Query cannot be empty")
k = k or self.default_k
logger.info(f"Processing query: {query[:100]}...")
try:
# Step 1: Analyze query (future enhancement placeholder)
analyzed_query = self._analyze_query(query)
# Step 2: Retrieve relevant documents
retrieval_results = self._retrieve_documents(analyzed_query, k)
# Step 3: Select and rank context
context_docs = self._select_context(retrieval_results)
# Step 4: Generate answer
answer = self._generate_answer(query, context_docs)
# Step 5: Assemble final response
final_answer = self._assemble_response(
answer, retrieval_results, query
)
logger.info(f"Query processed successfully with confidence: {final_answer.confidence}")
return final_answer
except Exception as e:
logger.error(f"Query processing failed: {str(e)}")
raise RuntimeError(f"Query processing failed: {str(e)}") from e
def _analyze_query(self, query: str) -> str:
"""
Analyze and potentially enhance the query.
In Phase 1, this is a placeholder that returns the query as-is.
In Phase 3, this will be enhanced with actual query analysis.
Args:
query: Original user query
Returns:
Analyzed/enhanced query
"""
# Placeholder for future enhancement
return query
def _retrieve_documents(self, query: str, k: int) -> List[RetrievalResult]:
"""
Retrieve relevant documents for the query.
Args:
query: Query string
k: Number of documents to retrieve
Returns:
List of retrieval results
"""
logger.debug(f"Retrieving {k} documents for query: {query[:50]}...")
results = self.retriever.retrieve(query, k)
if not results:
logger.warning(f"No documents retrieved for query: {query}")
else:
logger.debug(f"Retrieved {len(results)} documents with scores: "
f"{[r.score for r in results[:3]]}...")
return results
def _select_context(self, results: List[RetrievalResult]) -> List[Document]:
"""
Select and rank context documents.
In Phase 1, this applies basic filtering based on confidence.
In Phase 3, this will be enhanced with re-ranking and other features.
Args:
results: List of retrieval results
Returns:
List of selected context documents
"""
if not results:
return []
# Apply minimum confidence filtering
filtered_results = [
r for r in results
if r.score >= self.min_confidence
]
if not filtered_results:
logger.warning(f"No results passed confidence threshold of {self.min_confidence}")
# Return top results anyway if nothing passes threshold
filtered_results = results[:3] if results else []
# Extract documents
context_docs = [r.document for r in filtered_results]
logger.debug(f"Selected {len(context_docs)} context documents")
return context_docs
def _generate_answer(self, query: str, context_docs: List[Document]) -> Answer:
"""
Generate answer from query and context.
Args:
query: User query
context_docs: List of context documents
Returns:
Generated answer
"""
if not context_docs:
return Answer(
text="No relevant information found for your query.",
sources=[],
confidence=0.0,
metadata={
"query": query,
"context_docs": 0,
"processor": "QueryProcessor"
}
)
logger.debug(f"Generating answer with {len(context_docs)} context documents")
# Use answer generator
answer = self.generator.generate(query, context_docs)
# Add processor metadata
if not answer.metadata:
answer.metadata = {}
answer.metadata["processor"] = "QueryProcessor"
return answer
def _assemble_response(
self,
answer: Answer,
retrieval_results: List[RetrievalResult],
query: str
) -> Answer:
"""
Assemble the final response with metadata.
Args:
answer: Generated answer
retrieval_results: List of retrieval results
query: Original query
Returns:
Final answer with complete metadata
"""
# Add query processing metadata
answer.metadata.update({
"query": query,
"retrieved_docs": len(retrieval_results),
"retrieval_scores": [r.score for r in retrieval_results],
"retrieval_methods": [r.retrieval_method for r in retrieval_results],
"query_processor_config": {
"default_k": self.default_k,
"min_confidence": self.min_confidence
}
})
# Ensure sources are from the retrieved documents
if not answer.sources and retrieval_results:
answer.sources = [r.document for r in retrieval_results[:3]]
return answer
def explain_query(self, query: str) -> Dict[str, Any]:
"""
Explain how a query would be processed.
This method provides transparency into the query processing pipeline.
Args:
query: Query to explain
Returns:
Dictionary with query processing plan
"""
return {
"original_query": query,
"analyzed_query": self._analyze_query(query),
"retrieval_k": self.default_k,
"min_confidence": self.min_confidence,
"processing_steps": [
"1. Query analysis (currently passthrough)",
"2. Document retrieval using configured retriever",
"3. Context selection with confidence filtering",
"4. Answer generation using configured generator",
"5. Response assembly with metadata"
],
"config": self.config
}
def __str__(self) -> str:
"""String representation of the query processor."""
return f"QueryProcessor(retriever={type(self.retriever).__name__}, generator={type(self.generator).__name__})"
def __repr__(self) -> str:
"""Detailed representation of the query processor."""
return (f"QueryProcessor(retriever={repr(self.retriever)}, "
f"generator={repr(self.generator)}, "
f"config={self.config})")