File size: 9,763 Bytes
5e1a30c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
"""
Query Processor - Handles query execution workflow.

This component focuses solely on query processing logic,
separated from platform orchestration concerns. In Phase 1,
it extracts the query logic from RAGPipeline.
"""

import logging
from typing import List, Dict, Any, Optional

from .interfaces import Answer, RetrievalResult, Retriever, AnswerGenerator, Document

logger = logging.getLogger(__name__)


class QueryProcessor:
    """
    Query Processor handles the query execution workflow.
    
    Responsibilities:
    - Query analysis and enhancement
    - Retrieval orchestration
    - Context selection and ranking
    - Answer generation coordination
    - Response assembly
    
    In Phase 1, this component works with component references passed
    during initialization. In Phase 3, it will be enhanced with direct
    wiring and additional features like caching.
    """
    
    def __init__(
        self, 
        retriever: Retriever, 
        generator: AnswerGenerator,
        config: Optional[Dict[str, Any]] = None
    ):
        """
        Initialize query processor with component references.
        
        Args:
            retriever: Retriever component for document search
            generator: Answer generator component for response generation
            config: Optional configuration dictionary
        """
        self.retriever = retriever
        self.generator = generator
        self.config = config or {}
        
        # Extract configuration values
        self.default_k = self.config.get('retrieval_k', 5)
        self.min_confidence = self.config.get('min_confidence', 0.0)
        
        logger.info("Query processor initialized")
    
    def process(self, query: str, k: Optional[int] = None) -> Answer:
        """
        Process a query and return an answer.
        
        This method orchestrates the query processing workflow:
        1. Analyze query (placeholder for future enhancement)
        2. Retrieve relevant documents
        3. Select and rank context
        4. Generate answer
        5. Assemble final response
        
        Args:
            query: User query string
            k: Number of documents to retrieve (uses default if None)
            
        Returns:
            Answer object with generated text, sources, and metadata
            
        Raises:
            ValueError: If query is empty
            RuntimeError: If query processing fails
        """
        if not query.strip():
            raise ValueError("Query cannot be empty")
        
        k = k or self.default_k
        
        logger.info(f"Processing query: {query[:100]}...")
        
        try:
            # Step 1: Analyze query (future enhancement placeholder)
            analyzed_query = self._analyze_query(query)
            
            # Step 2: Retrieve relevant documents
            retrieval_results = self._retrieve_documents(analyzed_query, k)
            
            # Step 3: Select and rank context
            context_docs = self._select_context(retrieval_results)
            
            # Step 4: Generate answer
            answer = self._generate_answer(query, context_docs)
            
            # Step 5: Assemble final response
            final_answer = self._assemble_response(
                answer, retrieval_results, query
            )
            
            logger.info(f"Query processed successfully with confidence: {final_answer.confidence}")
            return final_answer
            
        except Exception as e:
            logger.error(f"Query processing failed: {str(e)}")
            raise RuntimeError(f"Query processing failed: {str(e)}") from e
    
    def _analyze_query(self, query: str) -> str:
        """
        Analyze and potentially enhance the query.
        
        In Phase 1, this is a placeholder that returns the query as-is.
        In Phase 3, this will be enhanced with actual query analysis.
        
        Args:
            query: Original user query
            
        Returns:
            Analyzed/enhanced query
        """
        # Placeholder for future enhancement
        return query
    
    def _retrieve_documents(self, query: str, k: int) -> List[RetrievalResult]:
        """
        Retrieve relevant documents for the query.
        
        Args:
            query: Query string
            k: Number of documents to retrieve
            
        Returns:
            List of retrieval results
        """
        logger.debug(f"Retrieving {k} documents for query: {query[:50]}...")
        
        results = self.retriever.retrieve(query, k)
        
        if not results:
            logger.warning(f"No documents retrieved for query: {query}")
        else:
            logger.debug(f"Retrieved {len(results)} documents with scores: "
                        f"{[r.score for r in results[:3]]}...")
        
        return results
    
    def _select_context(self, results: List[RetrievalResult]) -> List[Document]:
        """
        Select and rank context documents.
        
        In Phase 1, this applies basic filtering based on confidence.
        In Phase 3, this will be enhanced with re-ranking and other features.
        
        Args:
            results: List of retrieval results
            
        Returns:
            List of selected context documents
        """
        if not results:
            return []
        
        # Apply minimum confidence filtering
        filtered_results = [
            r for r in results 
            if r.score >= self.min_confidence
        ]
        
        if not filtered_results:
            logger.warning(f"No results passed confidence threshold of {self.min_confidence}")
            # Return top results anyway if nothing passes threshold
            filtered_results = results[:3] if results else []
        
        # Extract documents
        context_docs = [r.document for r in filtered_results]
        
        logger.debug(f"Selected {len(context_docs)} context documents")
        return context_docs
    
    def _generate_answer(self, query: str, context_docs: List[Document]) -> Answer:
        """
        Generate answer from query and context.
        
        Args:
            query: User query
            context_docs: List of context documents
            
        Returns:
            Generated answer
        """
        if not context_docs:
            return Answer(
                text="No relevant information found for your query.",
                sources=[],
                confidence=0.0,
                metadata={
                    "query": query,
                    "context_docs": 0,
                    "processor": "QueryProcessor"
                }
            )
        
        logger.debug(f"Generating answer with {len(context_docs)} context documents")
        
        # Use answer generator
        answer = self.generator.generate(query, context_docs)
        
        # Add processor metadata
        if not answer.metadata:
            answer.metadata = {}
        answer.metadata["processor"] = "QueryProcessor"
        
        return answer
    
    def _assemble_response(
        self,
        answer: Answer,
        retrieval_results: List[RetrievalResult],
        query: str
    ) -> Answer:
        """
        Assemble the final response with metadata.
        
        Args:
            answer: Generated answer
            retrieval_results: List of retrieval results
            query: Original query
            
        Returns:
            Final answer with complete metadata
        """
        # Add query processing metadata
        answer.metadata.update({
            "query": query,
            "retrieved_docs": len(retrieval_results),
            "retrieval_scores": [r.score for r in retrieval_results],
            "retrieval_methods": [r.retrieval_method for r in retrieval_results],
            "query_processor_config": {
                "default_k": self.default_k,
                "min_confidence": self.min_confidence
            }
        })
        
        # Ensure sources are from the retrieved documents
        if not answer.sources and retrieval_results:
            answer.sources = [r.document for r in retrieval_results[:3]]
        
        return answer
    
    def explain_query(self, query: str) -> Dict[str, Any]:
        """
        Explain how a query would be processed.
        
        This method provides transparency into the query processing pipeline.
        
        Args:
            query: Query to explain
            
        Returns:
            Dictionary with query processing plan
        """
        return {
            "original_query": query,
            "analyzed_query": self._analyze_query(query),
            "retrieval_k": self.default_k,
            "min_confidence": self.min_confidence,
            "processing_steps": [
                "1. Query analysis (currently passthrough)",
                "2. Document retrieval using configured retriever",
                "3. Context selection with confidence filtering",
                "4. Answer generation using configured generator",
                "5. Response assembly with metadata"
            ],
            "config": self.config
        }
    
    def __str__(self) -> str:
        """String representation of the query processor."""
        return f"QueryProcessor(retriever={type(self.retriever).__name__}, generator={type(self.generator).__name__})"
    
    def __repr__(self) -> str:
        """Detailed representation of the query processor."""
        return (f"QueryProcessor(retriever={repr(self.retriever)}, "
                f"generator={repr(self.generator)}, "
                f"config={self.config})")