|
""" |
|
Formatting utilities for GAIA implementation. |
|
|
|
This module provides functions for extracting, formatting, and validating text content, |
|
including answers, dates, numbers, and structured data. |
|
""" |
|
|
|
import re |
|
import json |
|
import logging |
|
from typing import Dict, Any, List, Optional, Union, Tuple, Set |
|
|
|
logger = logging.getLogger("gaia_agent.utils.formatting") |
|
|
|
|
|
FORMAT_TYPES = { |
|
"TEXT": "text", |
|
"NUMBER": "number", |
|
"DATE": "date", |
|
"BOOLEAN": "boolean", |
|
"LIST": "list", |
|
"ENTITY": "entity", |
|
"STRUCTURED": "structured" |
|
} |
|
|
|
def extract_answer(response: str, expected_format: Optional[str] = None) -> str: |
|
""" |
|
Extract the answer from a response. |
|
|
|
Args: |
|
response: The full response text |
|
expected_format: Optional expected format type |
|
|
|
Returns: |
|
The extracted answer |
|
""" |
|
if len(response.split()) < 20: |
|
return response.strip() |
|
|
|
answer_patterns = [ |
|
r'(?:The\s+answer\s+is:?\s+)([^.]+)', |
|
r'(?:In\s+conclusion:?\s+)([^.]+)', |
|
r'(?:To\s+summarize:?\s+)([^.]+)', |
|
r'(?:Therefore:?\s+)([^.]+)', |
|
r'(?:Thus:?\s+)([^.]+)', |
|
r'(?:In\s+summary:?\s+)([^.]+)', |
|
r'(?:The\s+result\s+is:?\s+)([^.]+)' |
|
] |
|
|
|
for pattern in answer_patterns: |
|
match = re.search(pattern, response, re.IGNORECASE) |
|
if match: |
|
return match.group(1).strip() |
|
|
|
if expected_format: |
|
if expected_format == FORMAT_TYPES["NUMBER"]: |
|
return extract_number(response) |
|
elif expected_format == FORMAT_TYPES["DATE"]: |
|
return extract_date(response) |
|
elif expected_format == FORMAT_TYPES["BOOLEAN"]: |
|
return extract_boolean(response) |
|
elif expected_format == FORMAT_TYPES["LIST"]: |
|
return extract_list(response) |
|
elif expected_format == FORMAT_TYPES["ENTITY"]: |
|
return extract_entity(response) |
|
elif expected_format == FORMAT_TYPES["STRUCTURED"]: |
|
return extract_structured_data(response) |
|
|
|
sentences = re.split(r'[.!?]\s+', response) |
|
if sentences: |
|
return sentences[-1].strip() |
|
|
|
return response.strip() |
|
|
|
def extract_number(text: str) -> str: |
|
""" |
|
Extract a numerical answer from text. |
|
|
|
Args: |
|
text: The text to extract from |
|
|
|
Returns: |
|
The extracted number as a string |
|
""" |
|
number_pattern = r'(?:[\$€£¥])?(?:[\-+])?(?:\d{1,3}(?:,\d{3})+|\d+)(?:\.\d+)?(?:\s*%)?' |
|
|
|
answer_number_patterns = [ |
|
rf'(?:The\s+(?:answer|result|number|value|amount)\s+is:?\s+)({number_pattern})', |
|
rf'(?:equals\s+)({number_pattern})', |
|
rf'(?:approximately\s+)({number_pattern})', |
|
rf'(?:about\s+)({number_pattern})' |
|
] |
|
|
|
for pattern in answer_number_patterns: |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
if match: |
|
return match.group(1).strip() |
|
|
|
matches = re.findall(number_pattern, text) |
|
if matches: |
|
return matches[-1].strip() |
|
|
|
return "" |
|
|
|
def extract_date(text: str) -> str: |
|
""" |
|
Extract a date or time from text. |
|
|
|
Args: |
|
text: The text to extract from |
|
|
|
Returns: |
|
The extracted date as a string |
|
""" |
|
date_patterns = [ |
|
r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}', |
|
r'\d{4}[-/]\d{1,2}[-/]\d{1,2}', |
|
r'(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2}(?:st|nd|rd|th)?,?\s+\d{4}', |
|
r'\d{1,2}(?:st|nd|rd|th)?\s+(?:of\s+)?(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*,?\s+\d{4}', |
|
r'\d{1,2}:\d{2}(?::\d{2})?\s*(?:AM|PM|am|pm)?' |
|
] |
|
|
|
answer_date_patterns = [ |
|
rf'(?:The\s+(?:date|time)\s+is:?\s+)({")|(".join(date_patterns)})', |
|
rf'(?:occurred\s+on\s+)({")|(".join(date_patterns)})', |
|
rf'(?:happened\s+on\s+)({")|(".join(date_patterns)})', |
|
rf'(?:scheduled\s+for\s+)({")|(".join(date_patterns)})' |
|
] |
|
|
|
for pattern in answer_date_patterns: |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
if match: |
|
return match.group(1).strip() |
|
|
|
for pattern in date_patterns: |
|
match = re.search(pattern, text) |
|
if match: |
|
return match.group(0).strip() |
|
|
|
return "" |
|
|
|
def extract_boolean(text: str) -> str: |
|
""" |
|
Extract a boolean answer (yes/no, true/false) from text. |
|
|
|
Args: |
|
text: The text to extract from |
|
|
|
Returns: |
|
The extracted boolean as a string |
|
""" |
|
text_lower = text.lower() |
|
|
|
yes_patterns = [ |
|
r'^yes', |
|
r'^affirmative', |
|
r'^correct', |
|
r'^true', |
|
r'the answer is yes', |
|
r'the answer is affirmative', |
|
r'the answer is true', |
|
r'the answer is correct' |
|
] |
|
|
|
no_patterns = [ |
|
r'^no', |
|
r'^negative', |
|
r'^incorrect', |
|
r'^false', |
|
r'the answer is no', |
|
r'the answer is negative', |
|
r'the answer is false', |
|
r'the answer is incorrect' |
|
] |
|
|
|
for pattern in yes_patterns: |
|
if re.search(pattern, text_lower): |
|
return "Yes" |
|
|
|
for pattern in no_patterns: |
|
if re.search(pattern, text_lower): |
|
return "No" |
|
|
|
positive_terms = ["can", "does", "is", "will", "should", "would", "could", "positive", "affirmative"] |
|
negative_terms = ["cannot", "can't", "doesn't", "does not", "isn't", "is not", "won't", "will not", |
|
"shouldn't", "should not", "wouldn't", "would not", "couldn't", "could not", |
|
"negative", "never"] |
|
|
|
positive_count = sum(1 for term in positive_terms if term in text_lower) |
|
negative_count = sum(1 for term in negative_terms if term in text_lower) |
|
|
|
if positive_count > negative_count: |
|
return "Yes" |
|
elif negative_count > positive_count: |
|
return "No" |
|
|
|
return "Unknown" |
|
|
|
def extract_list(text: str) -> str: |
|
""" |
|
Extract a comma-separated list from text. |
|
|
|
Args: |
|
text: The text to extract from |
|
|
|
Returns: |
|
The extracted list as a comma-separated string |
|
""" |
|
list_patterns = [ |
|
r'(?:The\s+list\s+is:?\s+)(.*?)(?:\.|\n|$)', |
|
r'(?:The\s+items\s+are:?\s+)(.*?)(?:\.|\n|$)', |
|
r'(?:The\s+elements\s+are:?\s+)(.*?)(?:\.|\n|$)', |
|
r'(?:They\s+are:?\s+)(.*?)(?:\.|\n|$)' |
|
] |
|
|
|
for pattern in list_patterns: |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
if match: |
|
items = match.group(1).strip() |
|
items = re.sub(r'\s*,\s*', ', ', items) |
|
items = re.sub(r'\s+and\s+', ', ', items) |
|
items = re.sub(r'\s*;\s*', ', ', items) |
|
return items |
|
|
|
bullet_pattern = r'(?:^|\n)(?:\d+\.|[-•*])\s*(.*?)(?:$|\n)' |
|
bullet_matches = re.findall(bullet_pattern, text) |
|
if bullet_matches: |
|
return ', '.join(item.strip() for item in bullet_matches) |
|
|
|
comma_pattern = r'(?:^|:)\s*((?:\w+(?:\s+\w+)*,\s*){2,}(?:\w+(?:\s+\w+)*))' |
|
comma_match = re.search(comma_pattern, text) |
|
if comma_match: |
|
return comma_match.group(1).strip() |
|
|
|
return "" |
|
|
|
def extract_entity(text: str) -> str: |
|
""" |
|
Extract a named entity (person, place, organization) from text. |
|
|
|
Args: |
|
text: The text to extract from |
|
|
|
Returns: |
|
The extracted entity as a string |
|
""" |
|
entity_patterns = [ |
|
r'(?:The\s+(?:person|individual)\s+is:?\s+)([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', |
|
r'(?:The\s+(?:place|location)\s+is:?\s+)([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', |
|
r'(?:The\s+(?:organization|company)\s+is:?\s+)([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', |
|
r'(?:The\s+(?:entity)\s+is:?\s+)([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)' |
|
] |
|
|
|
for pattern in entity_patterns: |
|
match = re.search(pattern, text) |
|
if match: |
|
return match.group(1).strip() |
|
|
|
proper_noun_pattern = r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)' |
|
matches = re.findall(proper_noun_pattern, text) |
|
|
|
if matches: |
|
common_words = {"The", "A", "An", "This", "That", "These", "Those", "It", "Its", "I", "My", "Me", "You", "Your"} |
|
filtered_matches = [match for match in matches if match not in common_words] |
|
|
|
if filtered_matches: |
|
return filtered_matches[0].strip() |
|
|
|
return "" |
|
|
|
def extract_structured_data(text: str) -> str: |
|
""" |
|
Extract structured data (JSON) from text. |
|
|
|
Args: |
|
text: The text to extract from |
|
|
|
Returns: |
|
The extracted structured data as a string |
|
""" |
|
json_pattern = r'```(?:json)?\s*({[\s\S]*?})```' |
|
match = re.search(json_pattern, text) |
|
if match: |
|
json_str = match.group(1).strip() |
|
try: |
|
parsed = json.loads(json_str) |
|
return json.dumps(parsed, indent=2) |
|
except json.JSONDecodeError: |
|
logger.warning("Found JSON-like block but it's not valid JSON") |
|
|
|
curly_pattern = r'({[\s\S]*?})' |
|
matches = re.findall(curly_pattern, text) |
|
for match in matches: |
|
try: |
|
parsed = json.loads(match) |
|
return json.dumps(parsed, indent=2) |
|
except json.JSONDecodeError: |
|
continue |
|
|
|
return "" |
|
|
|
def format_answer(answer: str, format_type: str) -> str: |
|
""" |
|
Format an answer according to the specified format type. |
|
|
|
Args: |
|
answer: The answer to format |
|
format_type: The desired format type |
|
|
|
Returns: |
|
The formatted answer |
|
""" |
|
|
|
if format_type == FORMAT_TYPES["TEXT"]: |
|
return answer.strip() |
|
|
|
elif format_type == FORMAT_TYPES["NUMBER"]: |
|
number_str = extract_number(answer) |
|
if not number_str: |
|
return answer.strip() |
|
|
|
clean_number = re.sub(r'[^\d\.\-\+]', '', number_str) |
|
try: |
|
num = float(clean_number) |
|
if num.is_integer(): |
|
return str(int(num)) |
|
return str(num) |
|
except ValueError: |
|
return number_str |
|
|
|
elif format_type == FORMAT_TYPES["DATE"]: |
|
date_str = extract_date(answer) |
|
if date_str: |
|
return date_str |
|
return answer.strip() |
|
|
|
elif format_type == FORMAT_TYPES["BOOLEAN"]: |
|
bool_str = extract_boolean(answer) |
|
if bool_str in ["Yes", "No", "Unknown"]: |
|
return bool_str |
|
return answer.strip() |
|
|
|
elif format_type == FORMAT_TYPES["LIST"]: |
|
list_str = extract_list(answer) |
|
if list_str: |
|
items = [item.strip() for item in list_str.split(',')] |
|
return ', '.join(items) |
|
return answer.strip() |
|
|
|
elif format_type == FORMAT_TYPES["ENTITY"]: |
|
entity_str = extract_entity(answer) |
|
if entity_str: |
|
return entity_str |
|
return answer.strip() |
|
|
|
elif format_type == FORMAT_TYPES["STRUCTURED"]: |
|
json_str = extract_structured_data(answer) |
|
if json_str: |
|
return json_str |
|
return answer.strip() |
|
|
|
return answer.strip() |
|
|
|
def validate_answer_format(answer: str, expected_format: str) -> Dict[str, Any]: |
|
""" |
|
Validate that an answer matches the expected format. |
|
|
|
Args: |
|
answer: The answer to validate |
|
expected_format: The expected format type |
|
|
|
Returns: |
|
Dictionary with validation results |
|
""" |
|
|
|
result = { |
|
"is_valid": False, |
|
"formatted_answer": answer, |
|
"error": None |
|
} |
|
|
|
if expected_format == FORMAT_TYPES["NUMBER"]: |
|
clean_answer = re.sub(r'[^\d\.\-\+]', '', answer) |
|
try: |
|
float(clean_answer) |
|
result["is_valid"] = True |
|
result["formatted_answer"] = clean_answer |
|
except ValueError: |
|
result["is_valid"] = False |
|
result["error"] = "Answer is not a valid number" |
|
|
|
elif expected_format == FORMAT_TYPES["DATE"]: |
|
date_patterns = [ |
|
r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}', |
|
r'\d{4}[-/]\d{1,2}[-/]\d{1,2}', |
|
r'(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2}(?:st|nd|rd|th)?,?\s+\d{4}', |
|
r'\d{1,2}(?:st|nd|rd|th)?\s+(?:of\s+)?(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*,?\s+\d{4}' |
|
] |
|
if any(re.search(pattern, answer) for pattern in date_patterns): |
|
result["is_valid"] = True |
|
else: |
|
result["is_valid"] = False |
|
result["error"] = "Answer is not a valid date" |
|
|
|
elif expected_format == FORMAT_TYPES["BOOLEAN"]: |
|
bool_answer = extract_boolean(answer) |
|
if bool_answer in ["Yes", "No"]: |
|
result["is_valid"] = True |
|
result["formatted_answer"] = bool_answer |
|
else: |
|
result["is_valid"] = False |
|
result["error"] = "Answer is not a clear Yes/No" |
|
|
|
elif expected_format == FORMAT_TYPES["LIST"]: |
|
list_items = [item.strip() for item in answer.split(',')] |
|
if len(list_items) > 1: |
|
result["is_valid"] = True |
|
result["formatted_answer"] = ', '.join(list_items) |
|
else: |
|
result["is_valid"] = False |
|
result["error"] = "Answer is not a comma-separated list" |
|
|
|
elif expected_format == FORMAT_TYPES["ENTITY"]: |
|
if re.match(r'^[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*$', answer): |
|
result["is_valid"] = True |
|
else: |
|
result["is_valid"] = False |
|
result["error"] = "Answer is not a valid named entity" |
|
|
|
elif expected_format == FORMAT_TYPES["STRUCTURED"]: |
|
try: |
|
json.loads(answer) |
|
result["is_valid"] = True |
|
except json.JSONDecodeError: |
|
result["is_valid"] = False |
|
result["error"] = "Answer is not valid JSON" |
|
|
|
else: |
|
result["is_valid"] = True |
|
|
|
return result |
|
|
|
def process_answer(response: str, expected_format: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Process an answer: extract, format, and validate. |
|
|
|
Args: |
|
response: The full response text |
|
expected_format: Optional expected format type |
|
|
|
Returns: |
|
Dictionary with processed answer information |
|
""" |
|
|
|
format_type = expected_format if expected_format else FORMAT_TYPES["TEXT"] |
|
|
|
extracted_answer = extract_answer(response, format_type) |
|
formatted_answer = format_answer(extracted_answer, format_type) |
|
validation_result = validate_answer_format(formatted_answer, format_type) |
|
|
|
return { |
|
"original_response": response, |
|
"extracted_answer": extracted_answer, |
|
"formatted_answer": formatted_answer, |
|
"format_type": format_type, |
|
"is_valid": validation_result["is_valid"], |
|
"error": validation_result["error"] |
|
} |