|
""" |
|
Question parsing utilities for GAIA implementation. |
|
|
|
This module provides functions for extracting key information from questions, |
|
identifying required tools, and determining question complexity. |
|
""" |
|
|
|
import re |
|
import logging |
|
from typing import Dict, Any, List, Optional, Union, Tuple, Set |
|
import json |
|
|
|
logger = logging.getLogger("gaia_agent.utils.parsing") |
|
|
|
COMPLEXITY_LEVELS = { |
|
"SIMPLE": 1, |
|
"MODERATE": 2, |
|
"COMPLEX": 3, |
|
"VERY_COMPLEX": 4 |
|
} |
|
|
|
def extract_key_information(question: str) -> Dict[str, Any]: |
|
""" |
|
Extract key information from a question. |
|
|
|
Args: |
|
question: The question to analyze |
|
|
|
Returns: |
|
Dictionary containing extracted information |
|
""" |
|
|
|
result = { |
|
"question_text": question, |
|
"entities": [], |
|
"keywords": [], |
|
"question_type": "", |
|
"constraints": [], |
|
"temporal_references": [], |
|
"expected_answer_type": "unknown" |
|
} |
|
|
|
capitalized_words = re.findall(r'\b[A-Z][a-zA-Z]*\b', question) |
|
result["entities"] = [word for word in capitalized_words if len(word) > 1] |
|
|
|
stop_words = {"a", "an", "the", "is", "are", "was", "were", "be", "been", |
|
"being", "to", "of", "and", "or", "in", "on", "at", "by", |
|
"for", "with", "about", "against", "between", "into", |
|
"through", "during", "before", "after", "above", "below", |
|
"from", "up", "down", "out", "off", "over", "under", "again", |
|
"further", "then", "once", "here", "there", "when", "where", |
|
"why", "how", "all", "any", "both", "each", "few", "more", |
|
"most", "other", "some", "such", "no", "nor", "not", "only", |
|
"own", "same", "so", "than", "too", "very", "s", "t", "can", |
|
"will", "just", "don", "should", "now", "d", "ll", "m", "o", |
|
"re", "ve", "y", "ain", "aren", "couldn", "didn", "doesn", |
|
"hadn", "hasn", "haven", "isn", "ma", "mightn", "mustn", |
|
"needn", "shan", "shouldn", "wasn", "weren", "won", "wouldn"} |
|
|
|
words = re.findall(r'\b[a-zA-Z]+\b', question.lower()) |
|
result["keywords"] = [word for word in words if word not in stop_words and len(word) > 2] |
|
|
|
if re.search(r'\b(what|who|where|when|which)\b', question.lower()): |
|
result["question_type"] = "factual" |
|
elif re.search(r'\b(why|how)\b', question.lower()): |
|
result["question_type"] = "explanatory" |
|
elif re.search(r'\b(is|are|was|were|do|does|did|can|could|will|would|should|has|have)\b', question.lower()): |
|
result["question_type"] = "yes_no" |
|
elif re.search(r'\b(list|name|give|enumerate)\b', question.lower()): |
|
result["question_type"] = "list" |
|
elif re.search(r'\b(compare|contrast|difference|similarities)\b', question.lower()): |
|
result["question_type"] = "comparative" |
|
else: |
|
result["question_type"] = "other" |
|
|
|
time_patterns = [ |
|
r'\b(today|yesterday|tomorrow|now|current|latest|recent)\b', |
|
r'\b(in\s+\d{4})\b', |
|
r'\b(\d{4}s)\b', |
|
r'\b(last|this|next)\s+(day|week|month|year|decade)\b', |
|
r'\b(january|february|march|april|may|june|july|august|september|october|november|december)\b', |
|
r'\b(jan|feb|mar|apr|jun|jul|aug|sep|oct|nov|dec)\b' |
|
] |
|
|
|
for pattern in time_patterns: |
|
matches = re.findall(pattern, question.lower()) |
|
if matches: |
|
result["temporal_references"].extend(matches) |
|
|
|
if re.search(r'\b(who|person)\b', question.lower()): |
|
result["expected_answer_type"] = "person" |
|
elif re.search(r'\b(where|location|place)\b', question.lower()): |
|
result["expected_answer_type"] = "location" |
|
elif re.search(r'\b(when|date|time|year)\b', question.lower()): |
|
result["expected_answer_type"] = "time" |
|
elif re.search(r'\b(how\s+many|count|number|sum|total)\b', question.lower()): |
|
result["expected_answer_type"] = "number" |
|
elif re.search(r'\b(why|reason|cause)\b', question.lower()): |
|
result["expected_answer_type"] = "reason" |
|
elif re.search(r'\b(how|process|steps|procedure)\b', question.lower()): |
|
result["expected_answer_type"] = "process" |
|
elif re.search(r'\b(list|examples|types)\b', question.lower()): |
|
result["expected_answer_type"] = "list" |
|
|
|
constraint_patterns = [ |
|
r'(only|just)\s+([^.,;!?]*)', |
|
r'(at\s+least|at\s+most|more\s+than|less\s+than|exactly)\s+(\d+)', |
|
r'(between)\s+(\d+)\s+and\s+(\d+)', |
|
r'(not|except|excluding)\s+([^.,;!?]*)', |
|
r'(from|in)\s+([^.,;!?]*)', |
|
r'(before|after|during)\s+([^.,;!?]*)' |
|
] |
|
|
|
for pattern in constraint_patterns: |
|
matches = re.findall(pattern, question.lower()) |
|
if matches: |
|
result["constraints"].extend([' '.join(match) for match in matches]) |
|
|
|
return result |
|
|
|
def identify_required_tools(question: str) -> List[str]: |
|
""" |
|
Identify tools required to answer a question. |
|
|
|
Args: |
|
question: The question to analyze |
|
|
|
Returns: |
|
List of required tool names |
|
""" |
|
|
|
question_lower = question.lower() |
|
required_tools = set() |
|
|
|
if any(term in question_lower for term in ["search", "find", "look up", "latest", "current", "news", "information about"]): |
|
required_tools.add("web_search") |
|
|
|
if any(term in question_lower for term in ["website", "webpage", "url", "link", "extract", "content"]): |
|
required_tools.add("web_content") |
|
|
|
if any(term in question_lower for term in ["reason", "explain", "why", "how", "analyze", "understand", "interpret"]): |
|
required_tools.add("reasoning") |
|
|
|
if any(term in question_lower for term in ["calculate", "compute", "solve", "equation", "math", "formula", "number"]): |
|
required_tools.add("math") |
|
|
|
if any(term in question_lower for term in ["verify", "check", "fact", "true", "false", "accurate", "correct"]): |
|
required_tools.add("fact_verification") |
|
|
|
if any(term in question_lower for term in ["image", "picture", "photo", "describe", "visual", "see", "look at"]): |
|
required_tools.add("image_analysis") |
|
|
|
if any(term in question_lower for term in ["chart", "graph", "plot", "diagram", "visualization", "trend", "data"]): |
|
required_tools.add("chart_interpretation") |
|
|
|
if any(term in question_lower for term in ["document", "pdf", "docx", "text", "extract", "parse", "read"]): |
|
required_tools.add("document_parsing") |
|
|
|
if not required_tools: |
|
required_tools.add("reasoning") |
|
required_tools.add("web_search") |
|
|
|
return list(required_tools) |
|
|
|
def determine_question_complexity(question: str) -> Dict[str, Any]: |
|
""" |
|
Determine the complexity of a question. |
|
|
|
Args: |
|
question: The question to analyze |
|
|
|
Returns: |
|
Dictionary containing complexity assessment |
|
""" |
|
|
|
question_lower = question.lower() |
|
|
|
complexity_score = 1 |
|
|
|
|
|
if len(question.split()) > 15: |
|
complexity_score += 1 |
|
if len(question.split()) > 30: |
|
complexity_score += 1 |
|
|
|
if question.count("?") > 1: |
|
complexity_score += 1 |
|
|
|
complex_reasoning_terms = [ |
|
"why", "how", "explain", "analyze", "compare", "contrast", |
|
"evaluate", "assess", "interpret", "synthesize", "relationship", |
|
"impact", "effect", "cause", "implication", "consequence" |
|
] |
|
if any(term in question_lower for term in complex_reasoning_terms): |
|
complexity_score += 1 |
|
|
|
domains = { |
|
"science": ["physics", "chemistry", "biology", "scientific", "experiment"], |
|
"math": ["math", "equation", "calculation", "formula", "compute"], |
|
"history": ["history", "historical", "ancient", "century", "era", "period"], |
|
"geography": ["geography", "country", "region", "map", "location"], |
|
"literature": ["book", "author", "novel", "character", "literary"], |
|
"arts": ["art", "painting", "music", "artist", "composition"], |
|
"technology": ["technology", "computer", "software", "hardware", "digital"], |
|
"business": ["business", "economy", "market", "finance", "industry"], |
|
"politics": ["politics", "government", "policy", "election", "law"] |
|
} |
|
|
|
domain_count = 0 |
|
for domain, terms in domains.items(): |
|
if any(term in question_lower for term in terms): |
|
domain_count += 1 |
|
|
|
if domain_count > 1: |
|
complexity_score += 1 |
|
if domain_count > 2: |
|
complexity_score += 1 |
|
|
|
temporal_terms = [ |
|
"before", "after", "during", "while", "simultaneously", |
|
"previously", "subsequently", "meanwhile", "throughout", |
|
"initially", "finally", "eventually", "ultimately" |
|
] |
|
if any(term in question_lower for term in temporal_terms): |
|
complexity_score += 1 |
|
|
|
conditional_terms = [ |
|
"if", "unless", "assuming", "provided that", "in case", |
|
"depending on", "subject to", "given that", "on condition" |
|
] |
|
if any(term in question_lower for term in conditional_terms): |
|
complexity_score += 1 |
|
|
|
complexity_score = min(complexity_score, 4) |
|
|
|
complexity_level = "" |
|
for level, score in COMPLEXITY_LEVELS.items(): |
|
if complexity_score == score: |
|
complexity_level = level |
|
|
|
return { |
|
"complexity_level": complexity_level, |
|
"complexity_score": complexity_score, |
|
"requires_multiple_tools": len(identify_required_tools(question)) > 1, |
|
"requires_complex_reasoning": any(term in question_lower for term in complex_reasoning_terms), |
|
"multi_domain": domain_count > 1, |
|
"domains_involved": [domain for domain, terms in domains.items() if any(term in question_lower for term in terms)] |
|
} |
|
|
|
def parse_question(question: str) -> Dict[str, Any]: |
|
""" |
|
Comprehensive parsing of a question. |
|
|
|
Args: |
|
question: The question to parse |
|
|
|
Returns: |
|
Dictionary containing all parsed information |
|
""" |
|
|
|
result = { |
|
"question": question, |
|
"key_information": extract_key_information(question), |
|
"required_tools": identify_required_tools(question), |
|
"complexity": determine_question_complexity(question) |
|
} |
|
|
|
return result |