File size: 10,592 Bytes
c922f8b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""
Question parsing utilities for GAIA implementation.

This module provides functions for extracting key information from questions,
identifying required tools, and determining question complexity.
"""

import re
import logging
from typing import Dict, Any, List, Optional, Union, Tuple, Set
import json

logger = logging.getLogger("gaia_agent.utils.parsing")

COMPLEXITY_LEVELS = {
    "SIMPLE": 1,  # Single fact or straightforward question
    "MODERATE": 2,  # Multiple facts or moderate reasoning
    "COMPLEX": 3,  # Complex reasoning or multiple steps
    "VERY_COMPLEX": 4  # Advanced reasoning, multiple domains, or specialized knowledge
}

def extract_key_information(question: str) -> Dict[str, Any]:
    """
    Extract key information from a question.
    
    Args:
        question: The question to analyze
        
    Returns:
        Dictionary containing extracted information
    """
    
    result = {
        "question_text": question,
        "entities": [],
        "keywords": [],
        "question_type": "",
        "constraints": [],
        "temporal_references": [],
        "expected_answer_type": "unknown"
    }
    
    capitalized_words = re.findall(r'\b[A-Z][a-zA-Z]*\b', question)
    result["entities"] = [word for word in capitalized_words if len(word) > 1]
    
    stop_words = {"a", "an", "the", "is", "are", "was", "were", "be", "been", 
                 "being", "to", "of", "and", "or", "in", "on", "at", "by", 
                 "for", "with", "about", "against", "between", "into", 
                 "through", "during", "before", "after", "above", "below", 
                 "from", "up", "down", "out", "off", "over", "under", "again", 
                 "further", "then", "once", "here", "there", "when", "where", 
                 "why", "how", "all", "any", "both", "each", "few", "more", 
                 "most", "other", "some", "such", "no", "nor", "not", "only", 
                 "own", "same", "so", "than", "too", "very", "s", "t", "can", 
                 "will", "just", "don", "should", "now", "d", "ll", "m", "o", 
                 "re", "ve", "y", "ain", "aren", "couldn", "didn", "doesn", 
                 "hadn", "hasn", "haven", "isn", "ma", "mightn", "mustn", 
                 "needn", "shan", "shouldn", "wasn", "weren", "won", "wouldn"}
    
    words = re.findall(r'\b[a-zA-Z]+\b', question.lower())
    result["keywords"] = [word for word in words if word not in stop_words and len(word) > 2]
    
    if re.search(r'\b(what|who|where|when|which)\b', question.lower()):
        result["question_type"] = "factual"
    elif re.search(r'\b(why|how)\b', question.lower()):
        result["question_type"] = "explanatory"
    elif re.search(r'\b(is|are|was|were|do|does|did|can|could|will|would|should|has|have)\b', question.lower()):
        result["question_type"] = "yes_no"
    elif re.search(r'\b(list|name|give|enumerate)\b', question.lower()):
        result["question_type"] = "list"
    elif re.search(r'\b(compare|contrast|difference|similarities)\b', question.lower()):
        result["question_type"] = "comparative"
    else:
        result["question_type"] = "other"
    
    time_patterns = [
        r'\b(today|yesterday|tomorrow|now|current|latest|recent)\b',
        r'\b(in\s+\d{4})\b',  # in 2023
        r'\b(\d{4}s)\b',  # 1990s
        r'\b(last|this|next)\s+(day|week|month|year|decade)\b',
        r'\b(january|february|march|april|may|june|july|august|september|october|november|december)\b',
        r'\b(jan|feb|mar|apr|jun|jul|aug|sep|oct|nov|dec)\b'
    ]
    
    for pattern in time_patterns:
        matches = re.findall(pattern, question.lower())
        if matches:
            result["temporal_references"].extend(matches)
    
    if re.search(r'\b(who|person)\b', question.lower()):
        result["expected_answer_type"] = "person"
    elif re.search(r'\b(where|location|place)\b', question.lower()):
        result["expected_answer_type"] = "location"
    elif re.search(r'\b(when|date|time|year)\b', question.lower()):
        result["expected_answer_type"] = "time"
    elif re.search(r'\b(how\s+many|count|number|sum|total)\b', question.lower()):
        result["expected_answer_type"] = "number"
    elif re.search(r'\b(why|reason|cause)\b', question.lower()):
        result["expected_answer_type"] = "reason"
    elif re.search(r'\b(how|process|steps|procedure)\b', question.lower()):
        result["expected_answer_type"] = "process"
    elif re.search(r'\b(list|examples|types)\b', question.lower()):
        result["expected_answer_type"] = "list"
    
    constraint_patterns = [
        r'(only|just)\s+([^.,;!?]*)',
        r'(at\s+least|at\s+most|more\s+than|less\s+than|exactly)\s+(\d+)',
        r'(between)\s+(\d+)\s+and\s+(\d+)',
        r'(not|except|excluding)\s+([^.,;!?]*)',
        r'(from|in)\s+([^.,;!?]*)',
        r'(before|after|during)\s+([^.,;!?]*)'
    ]
    
    for pattern in constraint_patterns:
        matches = re.findall(pattern, question.lower())
        if matches:
            result["constraints"].extend([' '.join(match) for match in matches])
    
    return result

def identify_required_tools(question: str) -> List[str]:
    """
    Identify tools required to answer a question.
    
    Args:
        question: The question to analyze
        
    Returns:
        List of required tool names
    """
    
    question_lower = question.lower()
    required_tools = set()
    
    if any(term in question_lower for term in ["search", "find", "look up", "latest", "current", "news", "information about"]):
        required_tools.add("web_search")
    
    if any(term in question_lower for term in ["website", "webpage", "url", "link", "extract", "content"]):
        required_tools.add("web_content")
    
    if any(term in question_lower for term in ["reason", "explain", "why", "how", "analyze", "understand", "interpret"]):
        required_tools.add("reasoning")
    
    if any(term in question_lower for term in ["calculate", "compute", "solve", "equation", "math", "formula", "number"]):
        required_tools.add("math")
    
    if any(term in question_lower for term in ["verify", "check", "fact", "true", "false", "accurate", "correct"]):
        required_tools.add("fact_verification")
    
    if any(term in question_lower for term in ["image", "picture", "photo", "describe", "visual", "see", "look at"]):
        required_tools.add("image_analysis")
    
    if any(term in question_lower for term in ["chart", "graph", "plot", "diagram", "visualization", "trend", "data"]):
        required_tools.add("chart_interpretation")
    
    if any(term in question_lower for term in ["document", "pdf", "docx", "text", "extract", "parse", "read"]):
        required_tools.add("document_parsing")
    
    if not required_tools:
        required_tools.add("reasoning")
        required_tools.add("web_search")
    
    return list(required_tools)

def determine_question_complexity(question: str) -> Dict[str, Any]:
    """
    Determine the complexity of a question.
    
    Args:
        question: The question to analyze
        
    Returns:
        Dictionary containing complexity assessment
    """
    
    question_lower = question.lower()
    
    complexity_score = 1  # Start with SIMPLE
    
    
    if len(question.split()) > 15:
        complexity_score += 1
    if len(question.split()) > 30:
        complexity_score += 1
    
    if question.count("?") > 1:
        complexity_score += 1
    
    complex_reasoning_terms = [
        "why", "how", "explain", "analyze", "compare", "contrast", 
        "evaluate", "assess", "interpret", "synthesize", "relationship",
        "impact", "effect", "cause", "implication", "consequence"
    ]
    if any(term in question_lower for term in complex_reasoning_terms):
        complexity_score += 1
    
    domains = {
        "science": ["physics", "chemistry", "biology", "scientific", "experiment"],
        "math": ["math", "equation", "calculation", "formula", "compute"],
        "history": ["history", "historical", "ancient", "century", "era", "period"],
        "geography": ["geography", "country", "region", "map", "location"],
        "literature": ["book", "author", "novel", "character", "literary"],
        "arts": ["art", "painting", "music", "artist", "composition"],
        "technology": ["technology", "computer", "software", "hardware", "digital"],
        "business": ["business", "economy", "market", "finance", "industry"],
        "politics": ["politics", "government", "policy", "election", "law"]
    }
    
    domain_count = 0
    for domain, terms in domains.items():
        if any(term in question_lower for term in terms):
            domain_count += 1
    
    if domain_count > 1:
        complexity_score += 1
    if domain_count > 2:
        complexity_score += 1
    
    temporal_terms = [
        "before", "after", "during", "while", "simultaneously",
        "previously", "subsequently", "meanwhile", "throughout",
        "initially", "finally", "eventually", "ultimately"
    ]
    if any(term in question_lower for term in temporal_terms):
        complexity_score += 1
    
    conditional_terms = [
        "if", "unless", "assuming", "provided that", "in case",
        "depending on", "subject to", "given that", "on condition"
    ]
    if any(term in question_lower for term in conditional_terms):
        complexity_score += 1
    
    complexity_score = min(complexity_score, 4)
    
    complexity_level = ""
    for level, score in COMPLEXITY_LEVELS.items():
        if complexity_score == score:
            complexity_level = level
    
    return {
        "complexity_level": complexity_level,
        "complexity_score": complexity_score,
        "requires_multiple_tools": len(identify_required_tools(question)) > 1,
        "requires_complex_reasoning": any(term in question_lower for term in complex_reasoning_terms),
        "multi_domain": domain_count > 1,
        "domains_involved": [domain for domain, terms in domains.items() if any(term in question_lower for term in terms)]
    }

def parse_question(question: str) -> Dict[str, Any]:
    """
    Comprehensive parsing of a question.
    
    Args:
        question: The question to parse
        
    Returns:
        Dictionary containing all parsed information
    """
    
    result = {
        "question": question,
        "key_information": extract_key_information(question),
        "required_tools": identify_required_tools(question),
        "complexity": determine_question_complexity(question)
    }
    
    return result