Spaces:
Sleeping
Sleeping
""" | |
Query Processor - Handles query execution workflow. | |
This component focuses solely on query processing logic, | |
separated from platform orchestration concerns. In Phase 1, | |
it extracts the query logic from RAGPipeline. | |
""" | |
import logging | |
from typing import List, Dict, Any, Optional | |
from .interfaces import Answer, RetrievalResult, Retriever, AnswerGenerator, Document | |
logger = logging.getLogger(__name__) | |
class QueryProcessor: | |
""" | |
Query Processor handles the query execution workflow. | |
Responsibilities: | |
- Query analysis and enhancement | |
- Retrieval orchestration | |
- Context selection and ranking | |
- Answer generation coordination | |
- Response assembly | |
In Phase 1, this component works with component references passed | |
during initialization. In Phase 3, it will be enhanced with direct | |
wiring and additional features like caching. | |
""" | |
def __init__( | |
self, | |
retriever: Retriever, | |
generator: AnswerGenerator, | |
config: Optional[Dict[str, Any]] = None | |
): | |
""" | |
Initialize query processor with component references. | |
Args: | |
retriever: Retriever component for document search | |
generator: Answer generator component for response generation | |
config: Optional configuration dictionary | |
""" | |
self.retriever = retriever | |
self.generator = generator | |
self.config = config or {} | |
# Extract configuration values | |
self.default_k = self.config.get('retrieval_k', 5) | |
self.min_confidence = self.config.get('min_confidence', 0.0) | |
logger.info("Query processor initialized") | |
def process(self, query: str, k: Optional[int] = None) -> Answer: | |
""" | |
Process a query and return an answer. | |
This method orchestrates the query processing workflow: | |
1. Analyze query (placeholder for future enhancement) | |
2. Retrieve relevant documents | |
3. Select and rank context | |
4. Generate answer | |
5. Assemble final response | |
Args: | |
query: User query string | |
k: Number of documents to retrieve (uses default if None) | |
Returns: | |
Answer object with generated text, sources, and metadata | |
Raises: | |
ValueError: If query is empty | |
RuntimeError: If query processing fails | |
""" | |
if not query.strip(): | |
raise ValueError("Query cannot be empty") | |
k = k or self.default_k | |
logger.info(f"Processing query: {query[:100]}...") | |
try: | |
# Step 1: Analyze query (future enhancement placeholder) | |
analyzed_query = self._analyze_query(query) | |
# Step 2: Retrieve relevant documents | |
retrieval_results = self._retrieve_documents(analyzed_query, k) | |
# Step 3: Select and rank context | |
context_docs = self._select_context(retrieval_results) | |
# Step 4: Generate answer | |
answer = self._generate_answer(query, context_docs) | |
# Step 5: Assemble final response | |
final_answer = self._assemble_response( | |
answer, retrieval_results, query | |
) | |
logger.info(f"Query processed successfully with confidence: {final_answer.confidence}") | |
return final_answer | |
except Exception as e: | |
logger.error(f"Query processing failed: {str(e)}") | |
raise RuntimeError(f"Query processing failed: {str(e)}") from e | |
def _analyze_query(self, query: str) -> str: | |
""" | |
Analyze and potentially enhance the query. | |
In Phase 1, this is a placeholder that returns the query as-is. | |
In Phase 3, this will be enhanced with actual query analysis. | |
Args: | |
query: Original user query | |
Returns: | |
Analyzed/enhanced query | |
""" | |
# Placeholder for future enhancement | |
return query | |
def _retrieve_documents(self, query: str, k: int) -> List[RetrievalResult]: | |
""" | |
Retrieve relevant documents for the query. | |
Args: | |
query: Query string | |
k: Number of documents to retrieve | |
Returns: | |
List of retrieval results | |
""" | |
logger.debug(f"Retrieving {k} documents for query: {query[:50]}...") | |
results = self.retriever.retrieve(query, k) | |
if not results: | |
logger.warning(f"No documents retrieved for query: {query}") | |
else: | |
logger.debug(f"Retrieved {len(results)} documents with scores: " | |
f"{[r.score for r in results[:3]]}...") | |
return results | |
def _select_context(self, results: List[RetrievalResult]) -> List[Document]: | |
""" | |
Select and rank context documents. | |
In Phase 1, this applies basic filtering based on confidence. | |
In Phase 3, this will be enhanced with re-ranking and other features. | |
Args: | |
results: List of retrieval results | |
Returns: | |
List of selected context documents | |
""" | |
if not results: | |
return [] | |
# Apply minimum confidence filtering | |
filtered_results = [ | |
r for r in results | |
if r.score >= self.min_confidence | |
] | |
if not filtered_results: | |
logger.warning(f"No results passed confidence threshold of {self.min_confidence}") | |
# Return top results anyway if nothing passes threshold | |
filtered_results = results[:3] if results else [] | |
# Extract documents | |
context_docs = [r.document for r in filtered_results] | |
logger.debug(f"Selected {len(context_docs)} context documents") | |
return context_docs | |
def _generate_answer(self, query: str, context_docs: List[Document]) -> Answer: | |
""" | |
Generate answer from query and context. | |
Args: | |
query: User query | |
context_docs: List of context documents | |
Returns: | |
Generated answer | |
""" | |
if not context_docs: | |
return Answer( | |
text="No relevant information found for your query.", | |
sources=[], | |
confidence=0.0, | |
metadata={ | |
"query": query, | |
"context_docs": 0, | |
"processor": "QueryProcessor" | |
} | |
) | |
logger.debug(f"Generating answer with {len(context_docs)} context documents") | |
# Use answer generator | |
answer = self.generator.generate(query, context_docs) | |
# Add processor metadata | |
if not answer.metadata: | |
answer.metadata = {} | |
answer.metadata["processor"] = "QueryProcessor" | |
return answer | |
def _assemble_response( | |
self, | |
answer: Answer, | |
retrieval_results: List[RetrievalResult], | |
query: str | |
) -> Answer: | |
""" | |
Assemble the final response with metadata. | |
Args: | |
answer: Generated answer | |
retrieval_results: List of retrieval results | |
query: Original query | |
Returns: | |
Final answer with complete metadata | |
""" | |
# Add query processing metadata | |
answer.metadata.update({ | |
"query": query, | |
"retrieved_docs": len(retrieval_results), | |
"retrieval_scores": [r.score for r in retrieval_results], | |
"retrieval_methods": [r.retrieval_method for r in retrieval_results], | |
"query_processor_config": { | |
"default_k": self.default_k, | |
"min_confidence": self.min_confidence | |
} | |
}) | |
# Ensure sources are from the retrieved documents | |
if not answer.sources and retrieval_results: | |
answer.sources = [r.document for r in retrieval_results[:3]] | |
return answer | |
def explain_query(self, query: str) -> Dict[str, Any]: | |
""" | |
Explain how a query would be processed. | |
This method provides transparency into the query processing pipeline. | |
Args: | |
query: Query to explain | |
Returns: | |
Dictionary with query processing plan | |
""" | |
return { | |
"original_query": query, | |
"analyzed_query": self._analyze_query(query), | |
"retrieval_k": self.default_k, | |
"min_confidence": self.min_confidence, | |
"processing_steps": [ | |
"1. Query analysis (currently passthrough)", | |
"2. Document retrieval using configured retriever", | |
"3. Context selection with confidence filtering", | |
"4. Answer generation using configured generator", | |
"5. Response assembly with metadata" | |
], | |
"config": self.config | |
} | |
def __str__(self) -> str: | |
"""String representation of the query processor.""" | |
return f"QueryProcessor(retriever={type(self.retriever).__name__}, generator={type(self.generator).__name__})" | |
def __repr__(self) -> str: | |
"""Detailed representation of the query processor.""" | |
return (f"QueryProcessor(retriever={repr(self.retriever)}, " | |
f"generator={repr(self.generator)}, " | |
f"config={self.config})") |